Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/query/expression/src/aggregate/aggregate_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use super::HashTableConfig;
use super::LOAD_FACTOR;
use super::MAX_PAGE_SIZE;
use super::Payload;
use super::ProbeBatchStat;
use super::group_hash_entries;
use super::hash_index::AdapterImpl;
use super::hash_index::HashIndex;
Expand Down Expand Up @@ -113,6 +114,8 @@ impl AggregateHashTable {
count: 0,
capacity,
capacity_mask: capacity - 1,
probe_batch_stats: Vec::new(),
probe_batch_index: 0,
},
config,
hash_index_resize_count: 0,
Expand Down Expand Up @@ -402,7 +405,10 @@ impl AggregateHashTable {
return;
}
self.hash_index_resize_count += 1;
self.hash_index = HashIndex::with_capacity(new_capacity);
let mut hash_index = HashIndex::with_capacity(new_capacity);
hash_index.probe_batch_stats = std::mem::take(&mut self.hash_index.probe_batch_stats);
hash_index.probe_batch_index = self.hash_index.probe_batch_index;
self.hash_index = hash_index;
return;
}

Expand Down Expand Up @@ -433,6 +439,8 @@ impl AggregateHashTable {
}
}

hash_index.probe_batch_stats = std::mem::take(&mut self.hash_index.probe_batch_stats);
hash_index.probe_batch_index = self.hash_index.probe_batch_index;
self.hash_index = hash_index
}

Expand Down Expand Up @@ -463,4 +471,8 @@ impl AggregateHashTable {
pub fn hash_index_resize_count(&self) -> usize {
self.hash_index_resize_count
}

pub fn take_probe_batch_stats(&mut self) -> Vec<ProbeBatchStat> {
self.hash_index.take_probe_batch_stats()
}
}
60 changes: 54 additions & 6 deletions src/query/expression/src/aggregate/hash_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,20 @@ pub(super) struct HashIndex {
pub count: usize,
pub capacity: usize,
pub capacity_mask: usize,
pub probe_batch_stats: Vec<ProbeBatchStat>,
pub probe_batch_index: usize,
}

const INCREMENT_BITS: usize = 5;

#[derive(Clone, Debug)]
pub struct ProbeBatchStat {
pub batch_index: usize,
pub row_count: usize,
pub avg_probe_distance: f64,
pub median_probe_distance: f64,
}

/// Derive an odd probing step from the high bits of the hash so the walk spans all slots.
///
/// this will generate a step in the range [1, 2^INCREMENT_BITS) based on hash and always odd.
Expand Down Expand Up @@ -60,26 +70,30 @@ impl HashIndex {
count: 0,
capacity,
capacity_mask,
probe_batch_stats: Vec::new(),
probe_batch_index: 0,
}
}

fn find_or_insert(&mut self, mut slot: usize, hash: u64) -> (usize, bool) {
fn find_or_insert(&mut self, mut slot: usize, hash: u64) -> (usize, bool, usize) {
let salt = Entry::hash_to_salt(hash);
let entries = self.entries.as_mut_slice();
let mut probe_distance = 0;
loop {
debug_assert!(entries.get(slot).is_some());
// SAFETY: slot is always in range
let entry = unsafe { entries.get_unchecked_mut(slot) };
if entry.is_occupied() {
if entry.get_salt() == salt {
return (slot, false);
return (slot, false, probe_distance);
} else {
slot = next_slot(slot, hash, self.capacity_mask);
probe_distance += 1;
continue;
}
} else {
entry.set_salt(salt);
return (slot, true);
return (slot, true, probe_distance);
}
}
}
Expand Down Expand Up @@ -186,6 +200,7 @@ impl HashIndex {
for (i, row) in state.no_match_vector[..row_count].iter_mut().enumerate() {
*row = i.into();
state.slots[i] = init_slot(state.group_hashes[i], self.capacity_mask);
state.probe_distances[i] = 0;
}

let mut new_group_count = 0;
Expand All @@ -199,9 +214,10 @@ impl HashIndex {
// 1. inject new_group_count, new_entry_count, need_compare_count, no_match_count
for row in state.no_match_vector[..remaining_entries].iter().copied() {
let slot = &mut state.slots[row];
let is_new;

(*slot, is_new) = self.find_or_insert(*slot, state.group_hashes[row]);
let (new_slot, is_new, probe_distance) =
self.find_or_insert(*slot, state.group_hashes[row]);
*slot = new_slot;
state.probe_distances[row.to_usize()] += probe_distance;
if is_new {
state.empty_vector[new_entry_count] = row;
new_entry_count += 1;
Expand Down Expand Up @@ -246,14 +262,46 @@ impl HashIndex {
let slot = &mut state.slots[row];
let hash = state.group_hashes[row];
*slot = next_slot(*slot, hash, self.capacity_mask);
state.probe_distances[row.to_usize()] += 1;
}
remaining_entries = no_match_count;
}

self.record_probe_batch_stats(&state.probe_distances[..row_count]);
self.count += new_group_count;

new_group_count
}

fn record_probe_batch_stats(&mut self, probe_distances: &[usize]) {
if probe_distances.is_empty() {
return;
}

let mut distances = probe_distances.to_vec();
let count = distances.len();
let sum: u64 = distances.iter().map(|value| *value as u64).sum();
let avg = sum as f64 / count as f64;

distances.sort_unstable();
let median = if count % 2 == 1 {
distances[count / 2] as f64
} else {
(distances[count / 2 - 1] as f64 + distances[count / 2] as f64) / 2.0
};

self.probe_batch_stats.push(ProbeBatchStat {
batch_index: self.probe_batch_index,
row_count: count,
avg_probe_distance: avg,
median_probe_distance: median,
});
self.probe_batch_index += 1;
}

pub fn take_probe_batch_stats(&mut self) -> Vec<ProbeBatchStat> {
std::mem::take(&mut self.probe_batch_stats)
}
}

pub(super) struct AdapterImpl<'a> {
Expand Down
1 change: 1 addition & 0 deletions src/query/expression/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub use aggregate_function_state::*;
pub use aggregate_hashtable::*;
pub use group_hash::*;
use hash_index::Entry;
pub use hash_index::ProbeBatchStat;
pub use partitioned_payload::*;
pub use payload::*;
pub use payload_flush::*;
Expand Down
2 changes: 2 additions & 0 deletions src/query/expression/src/aggregate/probe_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct ProbeState {
pub(super) group_compare_vector: [RowID; BATCH_SIZE],
pub(super) no_match_vector: [RowID; BATCH_SIZE],
pub(super) slots: [usize; BATCH_SIZE],
pub(super) probe_distances: [usize; BATCH_SIZE],
pub(super) row_count: usize,

pub partition_entries: Vec<(u16, SelectVector)>,
Expand All @@ -51,6 +52,7 @@ impl Default for ProbeState {
no_match_vector: [RowID::default(); BATCH_SIZE],
empty_vector: [RowID::default(); BATCH_SIZE],
slots: [0; BATCH_SIZE],
probe_distances: [0; BATCH_SIZE],

row_count: 0,
partition_entries: vec![],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::DataBlock;
use databend_common_expression::HashTableConfig;
use databend_common_expression::PayloadFlushState;
use databend_common_expression::ProbeBatchStat;
use databend_common_pipeline::core::Event;
use databend_common_pipeline::core::InputPort;
use databend_common_pipeline::core::OutputPort;
use databend_common_pipeline::core::Processor;
use databend_common_pipeline_transforms::MemorySettings;
use log::info;

use crate::pipelines::memory_settings::MemorySettingsExt;
use crate::pipelines::processors::transforms::aggregator::AggregateMeta;
Expand Down Expand Up @@ -256,6 +258,8 @@ impl NewTransformFinalAggregate {
let config = ht.config.clone();

self.statistics.log_finish_statistics(&ht);
let probe_stats = ht.take_probe_batch_stats();
log_probe_batch_stats(&probe_stats);
let mut blocks = vec![];
self.flush_state.clear();

Expand Down Expand Up @@ -315,6 +319,23 @@ impl NewTransformFinalAggregate {
}
}

fn log_probe_batch_stats(stats: &[ProbeBatchStat]) {
if stats.is_empty() {
return;
}

info!(
"hash_index probe_distance batches: total_batches={}",
stats.len()
);
for stat in stats.iter().rev().take(100).rev() {
info!(
"hash_index probe_distance batch={} count={} avg={:.3} median={:.3}",
stat.batch_index, stat.row_count, stat.avg_probe_distance, stat.median_probe_distance
);
}
}

#[async_trait::async_trait]
impl Processor for NewTransformFinalAggregate {
fn name(&self) -> String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ use databend_common_expression::AggregateHashTable;
use databend_common_expression::DataBlock;
use databend_common_expression::HashTableConfig;
use databend_common_expression::PayloadFlushState;
use databend_common_expression::ProbeBatchStat;
use databend_common_pipeline::core::InputPort;
use databend_common_pipeline::core::OutputPort;
use databend_common_pipeline::core::Processor;
use databend_common_pipeline_transforms::processors::BlockMetaTransform;
use databend_common_pipeline_transforms::processors::BlockMetaTransformer;
use log::info;

use crate::pipelines::processors::transforms::aggregator::AggregatorParams;
use crate::pipelines::processors::transforms::aggregator::aggregate_meta::AggregateMeta;
Expand Down Expand Up @@ -112,6 +114,9 @@ impl TransformFinalAggregate {
let mut blocks = vec![];
self.flush_state.clear();

let probe_stats = ht.take_probe_batch_stats();
log_probe_batch_stats(&probe_stats);

loop {
if ht.merge_result(&mut self.flush_state)? {
let mut entries = self.flush_state.take_aggregate_results();
Expand All @@ -134,6 +139,23 @@ impl TransformFinalAggregate {
}
}

fn log_probe_batch_stats(stats: &[ProbeBatchStat]) {
if stats.is_empty() {
return;
}

info!(
"hash_index probe_distance batches: total_batches={}",
stats.len()
);
for stat in stats {
info!(
"hash_index probe_distance batch={} count={} avg={:.3} median={:.3}",
stat.batch_index, stat.row_count, stat.avg_probe_distance, stat.median_probe_distance
);
}
}

impl BlockMetaTransform<AggregateMeta> for TransformFinalAggregate {
const NAME: &'static str = "TransformFinalAggregate";

Expand Down
Loading