From 9106bd56c514bb1154a1b7cbf8c7df9def8e405a Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Mon, 19 Jan 2026 17:42:17 +0800 Subject: [PATCH 01/25] save --- src/query/expression/src/aggregate/mod.rs | 3 +- .../src/aggregate/new_hash_index.rs | 55 +++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 src/query/expression/src/aggregate/new_hash_index.rs diff --git a/src/query/expression/src/aggregate/mod.rs b/src/query/expression/src/aggregate/mod.rs index d8629cb2bc557..a3dc02c3eb2ec 100644 --- a/src/query/expression/src/aggregate/mod.rs +++ b/src/query/expression/src/aggregate/mod.rs @@ -20,6 +20,7 @@ mod aggregate_function_state; mod aggregate_hashtable; mod group_hash; mod hash_index; +mod new_hash_index; mod partitioned_payload; mod payload; mod payload_flush; @@ -27,9 +28,9 @@ mod payload_row; mod probe_state; mod row_ptr; -use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; +use std::sync::Arc; pub use aggregate_function::*; pub use aggregate_function_state::*; diff --git a/src/query/expression/src/aggregate/new_hash_index.rs b/src/query/expression/src/aggregate/new_hash_index.rs new file mode 100644 index 0000000000000..99b23f7c5e2ec --- /dev/null +++ b/src/query/expression/src/aggregate/new_hash_index.rs @@ -0,0 +1,55 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Single tag in a control group. +#[derive(Copy, Clone, PartialEq, Eq)] +#[repr(transparent)] +struct Tag(u8); +impl Tag { + /// Control tag value for an empty bucket. + const EMPTY: Tag = Tag(0b1111_1111); + + /// Creates a control tag representing a full bucket with the given hash. + #[inline] + const fn full(hash: u64) -> Tag { + let top7 = hash >> (8 * 8 - 7); + Tag((top7 & 0x7f) as u8) // truncation + } +} + +#[derive(Copy, Clone)] +struct BitMask(u64); + +impl BitMask {} + +/// Helper function to replicate a tag across a `GroupWord`. +#[inline] +fn repeat(tag: Tag) -> u64 { + u64::from_ne_bytes([tag.0; Group::WIDTH]) +} + +#[derive(Copy, Clone)] +struct Group(u64); + +impl Group { + /// Number of bytes in the group. + const WIDTH: usize = 8; + + fn match_tag(self, tag: Tag) -> BitMask { + // This algorithm is derived from + // https://graphics.stanford.edu/~seander/bithacks.html##ValueInWord + let cmp = self.0 ^ repeat(tag); + BitMask((cmp.wrapping_sub(repeat(Tag(0x01))) & !cmp & repeat(Tag(0x80))).to_le()) + } +} From b7ea7a5136f858dfed10be0999a8588b2907cfe3 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 20 Jan 2026 16:21:20 +0800 Subject: [PATCH 02/25] save --- .../src/aggregate/new_hash_index.rs | 137 +++++++++++++++++- src/query/expression/src/lib.rs | 2 + 2 files changed, 138 insertions(+), 1 deletion(-) diff --git a/src/query/expression/src/aggregate/new_hash_index.rs b/src/query/expression/src/aggregate/new_hash_index.rs index 99b23f7c5e2ec..d9d2b63878acc 100644 --- a/src/query/expression/src/aggregate/new_hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index.rs @@ -1,3 +1,4 @@ +// Copyright (c) 2016 Amanieu d'Antras // Copyright 2021 Datafuse Labs // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,6 +13,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::hint::likely; +use std::num::NonZeroU64; +use std::ptr::NonNull; + +use databend_common_ast::parser::token::GROUP; + +use crate::ProbeState; +use crate::aggregate::hash_index::TableAdapter; +use crate::aggregate::row_ptr::RowPtr; + +/// Portions of this file are derived from excellent `hashbrown` crate: +/// https://github.com/rust-lang/hashbrown/ + /// Single tag in a control group. #[derive(Copy, Clone, PartialEq, Eq)] #[repr(transparent)] @@ -28,10 +42,69 @@ impl Tag { } } +const BITMASK_ITER_MASK: u64 = 0x8080_8080_8080_8080; + +const BITMASK_STRIDE: usize = 8; + +type NonZeroBitMaskWord = NonZeroU64; + #[derive(Copy, Clone)] struct BitMask(u64); -impl BitMask {} +impl BitMask { + #[inline] + #[must_use] + fn remove_lowest_bit(self) -> Self { + BitMask(self.0 & (self.0 - 1)) + } + + #[inline] + fn nonzero_trailing_zeros(nonzero: NonZeroBitMaskWord) -> usize { + if cfg!(target_arch = "arm") && BITMASK_STRIDE % 8 == 0 { + // SAFETY: A byte-swapped non-zero value is still non-zero. + let swapped = unsafe { NonZeroBitMaskWord::new_unchecked(nonzero.get().swap_bytes()) }; + swapped.leading_zeros() as usize / BITMASK_STRIDE + } else { + nonzero.trailing_zeros() as usize / BITMASK_STRIDE + } + } + + fn lowest_set_bit(self) -> Option { + if let Some(nonzero) = NonZeroBitMaskWord::new(self.0) { + Some(Self::nonzero_trailing_zeros(nonzero)) + } else { + None + } + } +} + +impl IntoIterator for BitMask { + type Item = usize; + type IntoIter = BitMaskIter; + + #[inline] + fn into_iter(self) -> BitMaskIter { + // A BitMask only requires each element (group of bits) to be non-zero. + // However for iteration we need each element to only contain 1 bit. + BitMaskIter(BitMask(self.0 & BITMASK_ITER_MASK)) + } +} + +/// Iterator over the contents of a `BitMask`, returning the indices of set +/// bits. +#[derive(Clone)] +struct BitMaskIter(BitMask); + +impl Iterator for BitMaskIter { + type Item = usize; + + #[inline] + fn next(&mut self) -> Option { + let bit = self.0.lowest_set_bit()?; + self.0 = self.0.remove_lowest_bit(); + Some(bit) + } +} /// Helper function to replicate a tag across a `GroupWord`. #[inline] @@ -52,4 +125,66 @@ impl Group { let cmp = self.0 ^ repeat(tag); BitMask((cmp.wrapping_sub(repeat(Tag(0x01))) & !cmp & repeat(Tag(0x80))).to_le()) } + + #[inline] + pub(crate) fn match_empty_or_deleted(self) -> BitMask { + BitMask((self.0 & repeat(Tag(0x80))).to_le()) + } + + unsafe fn load(ctrls: &Vec, index: usize) -> Self { + unsafe { Group((ctrls.as_ptr().add(index) as *const u64).read_unaligned()) } + } +} + +pub struct NewHashIndex { + ctrls: Vec, + pointers: Vec, + capacity: usize, + bucket_mask: usize, +} + +impl NewHashIndex { + pub fn with_capacity(capacity: usize) -> Self { + debug_assert!(capacity.is_power_of_two()); + // avoid handling: SMALL TABLE NASTY CORNER CASE + // This can happen for small (n < WIDTH) tables + debug_assert!(capacity >= Group::WIDTH); + } +} + +impl NewHashIndex { + #[inline] + fn find_insert_index_in_group(&self, group: &Group, pos: &usize) -> usize { + let bit = group.match_empty_or_deleted().lowest_set_bit(); + // SAFETY: This can happen for small (n < WIDTH) tables, because there are + // fake EMPTY bytes between us and the mirror bytes. The smallest table we + // are using is size 8192*4, which works fine. + (pos + bit.unwrap()) & self.bucket_mask + } + + pub fn find_or_insert(&mut self, hash: u64) -> (usize, bool) { + let tag_hash = Tag::full(hash); + let pos = hash as usize & self.bucket_mask; + loop { + let group = unsafe { Group::load(&self.ctrls, pos) }; + for bit in group.match_tag(tag_hash) { + let index = (pos + bit) & self.bucket_mask; + if likely(self.ctrls[index] == tag_hash) { + return (index, false); + } + } + let index = self.find_insert_index_in_group(&group, &pos); + } + + todo!() + } + + pub fn probe_and_create( + &mut self, + state: &mut ProbeState, + row_count: usize, + mut adapter: impl TableAdapter, + ) -> usize { + todo!() + } } diff --git a/src/query/expression/src/lib.rs b/src/query/expression/src/lib.rs index a1fa3bb8398de..78081491beabc 100755 --- a/src/query/expression/src/lib.rs +++ b/src/query/expression/src/lib.rs @@ -47,6 +47,8 @@ #![feature(debug_closure_helpers)] #![feature(never_type)] #![feature(iter_map_windows)] +#![feature(likely_unlikely)] + #[allow(dead_code)] mod block; From b3f5e8e07f288ef9e75fa8e17ac710188f4c9ffa Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 20 Jan 2026 16:46:27 +0800 Subject: [PATCH 03/25] finish find_or_insert --- .../src/aggregate/new_hash_index.rs | 64 +++++++++++++++---- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/src/query/expression/src/aggregate/new_hash_index.rs b/src/query/expression/src/aggregate/new_hash_index.rs index d9d2b63878acc..bdf1414880c17 100644 --- a/src/query/expression/src/aggregate/new_hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index.rs @@ -127,7 +127,7 @@ impl Group { } #[inline] - pub(crate) fn match_empty_or_deleted(self) -> BitMask { + pub(crate) fn match_empty(self) -> BitMask { BitMask((self.0 & repeat(Tag(0x80))).to_le()) } @@ -136,6 +136,27 @@ impl Group { } } +#[derive(Clone)] +struct ProbeSeq { + pos: usize, + stride: usize, +} + +impl ProbeSeq { + #[inline] + fn move_next(&mut self, bucket_mask: usize) { + // We should have found an empty bucket by now and ended the probe. + debug_assert!( + self.stride <= bucket_mask, + "Went past end of probe sequence" + ); + + self.stride += Group::WIDTH; + self.pos += self.stride; + self.pos &= bucket_mask; + } +} + pub struct NewHashIndex { ctrls: Vec, pointers: Vec, @@ -154,28 +175,43 @@ impl NewHashIndex { impl NewHashIndex { #[inline] - fn find_insert_index_in_group(&self, group: &Group, pos: &usize) -> usize { - let bit = group.match_empty_or_deleted().lowest_set_bit(); - // SAFETY: This can happen for small (n < WIDTH) tables, because there are - // fake EMPTY bytes between us and the mirror bytes. The smallest table we - // are using is size 8192*4, which works fine. - (pos + bit.unwrap()) & self.bucket_mask + fn probe_seq(&self, hash: u64) -> ProbeSeq { + ProbeSeq { + pos: hash as usize & self.bucket_mask, + stride: 0, + } + } + + #[inline] + fn find_insert_index_in_group(&self, group: &Group, pos: &usize) -> Option { + let bit = group.match_empty().lowest_set_bit(); + + if likely(bit.is_some()) { + Some((pos + bit.unwrap()) & self.bucket_mask) + } else { + None + } } pub fn find_or_insert(&mut self, hash: u64) -> (usize, bool) { + let mut insert_index = None; let tag_hash = Tag::full(hash); - let pos = hash as usize & self.bucket_mask; + let mut probe_seq = self.probe_seq(hash); loop { - let group = unsafe { Group::load(&self.ctrls, pos) }; + let group = unsafe { Group::load(&self.ctrls, probe_seq.pos) }; for bit in group.match_tag(tag_hash) { - let index = (pos + bit) & self.bucket_mask; - if likely(self.ctrls[index] == tag_hash) { - return (index, false); - } + let index = (probe_seq.pos + bit) & self.bucket_mask; + return (index, false); } - let index = self.find_insert_index_in_group(&group, &pos); + insert_index = self.find_insert_index_in_group(&group, &probe_seq.pos); + if insert_index.is_some() { + return (insert_index.unwrap(), true); + } + probe_seq.move_next(self.bucket_mask); } + } + pub fn probe_slot(&mut self, hash: u64) -> usize { todo!() } From 4ed38bf5beea015303e5f7ee614ea132697d5ba1 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 20 Jan 2026 17:51:49 +0800 Subject: [PATCH 04/25] save --- .../src/aggregate/new_hash_index.rs | 33 ++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/src/query/expression/src/aggregate/new_hash_index.rs b/src/query/expression/src/aggregate/new_hash_index.rs index bdf1414880c17..107870ea4f5b3 100644 --- a/src/query/expression/src/aggregate/new_hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index.rs @@ -20,6 +20,7 @@ use std::ptr::NonNull; use databend_common_ast::parser::token::GROUP; use crate::ProbeState; +use crate::aggregate::BATCH_SIZE; use crate::aggregate::hash_index::TableAdapter; use crate::aggregate::row_ptr::RowPtr; @@ -174,6 +175,24 @@ impl NewHashIndex { } impl NewHashIndex { + #[inline] + fn ctrl(&mut self, index: usize) -> *mut Tag { + debug_assert!(index < self.ctrls.len()); + unsafe { self.ctrls.as_mut_ptr().add(index) } + } + + #[inline] + fn set_ctrl(&mut self, index: usize, tag: Tag) { + // This is the same as `(index.wrapping_sub(Group::WIDTH)) % self.num_buckets() + Group::WIDTH` + // because the number of buckets is a power of two, and `self.bucket_mask = self.num_buckets() - 1`. + let index2 = ((index.wrapping_sub(Group::WIDTH)) & self.bucket_mask) + Group::WIDTH; + + unsafe { + *self.ctrl(index) = tag; + *self.ctrl(index2) = tag; + } + } + #[inline] fn probe_seq(&self, hash: u64) -> ProbeSeq { ProbeSeq { @@ -204,15 +223,22 @@ impl NewHashIndex { return (index, false); } insert_index = self.find_insert_index_in_group(&group, &probe_seq.pos); - if insert_index.is_some() { - return (insert_index.unwrap(), true); + if let Some(index) = insert_index { + return (index, true); } probe_seq.move_next(self.bucket_mask); } } pub fn probe_slot(&mut self, hash: u64) -> usize { - todo!() + let mut probe_seq = self.probe_seq(hash); + loop { + let group = unsafe { Group::load(&self.ctrls, probe_seq.pos) }; + if let Some(index) = self.find_insert_index_in_group(&group, &probe_seq.pos) { + return index; + } + probe_seq.move_next(self.bucket_mask); + } } pub fn probe_and_create( @@ -221,6 +247,5 @@ impl NewHashIndex { row_count: usize, mut adapter: impl TableAdapter, ) -> usize { - todo!() } } From fc934a2af40792e8bbe1202b13f4e6ab4862f3a5 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Wed, 21 Jan 2026 10:48:07 +0800 Subject: [PATCH 05/25] add new hash index --- .../src/aggregate/new_hash_index.rs | 198 +++++++++++++++++- .../expression/src/aggregate/probe_state.rs | 2 + 2 files changed, 198 insertions(+), 2 deletions(-) diff --git a/src/query/expression/src/aggregate/new_hash_index.rs b/src/query/expression/src/aggregate/new_hash_index.rs index 107870ea4f5b3..bf50678fec1df 100644 --- a/src/query/expression/src/aggregate/new_hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index.rs @@ -171,6 +171,15 @@ impl NewHashIndex { // avoid handling: SMALL TABLE NASTY CORNER CASE // This can happen for small (n < WIDTH) tables debug_assert!(capacity >= Group::WIDTH); + let bucket_mask = capacity - 1; + let ctrls = vec![Tag::EMPTY; capacity + Group::WIDTH]; + let pointers = vec![RowPtr::null(); capacity]; + Self { + ctrls, + pointers, + capacity, + bucket_mask, + } } } @@ -212,7 +221,7 @@ impl NewHashIndex { } } - pub fn find_or_insert(&mut self, hash: u64) -> (usize, bool) { + pub fn find_or_insert(&mut self, hash: u64, mut skip: usize) -> (usize, bool) { let mut insert_index = None; let tag_hash = Tag::full(hash); let mut probe_seq = self.probe_seq(hash); @@ -220,7 +229,10 @@ impl NewHashIndex { let group = unsafe { Group::load(&self.ctrls, probe_seq.pos) }; for bit in group.match_tag(tag_hash) { let index = (probe_seq.pos + bit) & self.bucket_mask; - return (index, false); + if likely(skip == 0) { + return (index, false); + } + skip -= 1; } insert_index = self.find_insert_index_in_group(&group, &probe_seq.pos); if let Some(index) = insert_index { @@ -247,5 +259,187 @@ impl NewHashIndex { row_count: usize, mut adapter: impl TableAdapter, ) -> usize { + for (i, row) in state.no_match_vector[..row_count].iter_mut().enumerate() { + *row = i.into(); + state.probe_skip[i] = 0; + } + + let mut new_group_count = 0; + let mut remaining_entries = row_count; + + while remaining_entries > 0 { + let mut new_entry_count = 0; + let mut need_compare_count = 0; + let mut no_match_count = 0; + + for row in state.no_match_vector[..remaining_entries].iter().copied() { + let skip = state.probe_skip[row]; + let (slot, is_new) = self.find_or_insert(state.group_hashes[row], skip); + state.slots[row] = slot; + + if is_new { + state.empty_vector[new_entry_count] = row; + new_entry_count += 1; + } else { + state.group_compare_vector[need_compare_count] = row; + need_compare_count += 1; + } + } + + if new_entry_count != 0 { + new_group_count += new_entry_count; + + adapter.append_rows(state, new_entry_count); + + for row in state.empty_vector[..new_entry_count].iter().copied() { + let slot = state.slots[row]; + self.set_ctrl(slot, Tag::full(state.group_hashes[row])); + self.pointers[slot] = state.addresses[row]; + } + } + + if need_compare_count > 0 { + for row in state.group_compare_vector[..need_compare_count] + .iter() + .copied() + { + let slot = state.slots[row]; + state.addresses[row] = self.pointers[slot]; + } + + no_match_count = adapter.compare(state, need_compare_count, no_match_count); + } + + for row in state.no_match_vector[..no_match_count].iter().copied() { + state.probe_skip[row] += 1; + } + + remaining_entries = no_match_count; + } + + new_group_count + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::*; + use crate::ProbeState; + + struct TestTableAdapter { + incoming: Vec<(u64, u64)>, // (key, hash) + payload: Vec<(u64, u64, u64)>, // (key, hash, value) + init_count: usize, + pin_data: Box<[u8]>, + } + + impl TestTableAdapter { + fn new(incoming: Vec<(u64, u64)>, payload: Vec<(u64, u64, u64)>) -> Self { + Self { + incoming, + init_count: payload.len(), + payload, + pin_data: vec![0; 1000].into(), + } + } + + fn init_state(&self) -> ProbeState { + let mut state = ProbeState { + row_count: self.incoming.len(), + ..Default::default() + }; + + for (i, (_, hash)) in self.incoming.iter().enumerate() { + state.group_hashes[i] = *hash + } + + state + } + + fn init_hash_index(&self, hash_index: &mut NewHashIndex) { + for (i, (_, hash, _)) in self.payload.iter().copied().enumerate() { + let slot = hash_index.probe_slot(hash); + hash_index.set_ctrl(slot, Tag::full(hash)); + hash_index.pointers[slot] = self.get_row_ptr(false, i); + } + } + + fn get_row_ptr(&self, incoming: bool, row: usize) -> RowPtr { + RowPtr::new(unsafe { + self.pin_data + .as_ptr() + .add(if incoming { row + self.init_count } else { row }) as _ + }) + } + + fn get_payload(&self, row_ptr: RowPtr) -> (u64, u64, u64) { + let index = row_ptr.as_ptr() as usize - self.pin_data.as_ptr() as usize; + self.payload[index] + } + } + + impl TableAdapter for &mut TestTableAdapter { + fn append_rows(&mut self, state: &mut ProbeState, new_entry_count: usize) { + for row in state.empty_vector[..new_entry_count].iter() { + let (key, hash) = self.incoming[*row]; + let value = key + 20; + + self.payload.push((key, hash, value)); + state.addresses[*row] = self.get_row_ptr(true, row.to_usize()); + } + } + + fn compare( + &mut self, + state: &mut ProbeState, + need_compare_count: usize, + mut no_match_count: usize, + ) -> usize { + for row in state.group_compare_vector[..need_compare_count] + .iter() + .copied() + { + let incoming = self.incoming[row]; + let (key, _, _) = self.get_payload(state.addresses[row]); + if incoming.0 == key { + continue; + } + + state.no_match_vector[no_match_count] = row; + no_match_count += 1; + } + + no_match_count + } + } + + #[test] + fn test_new_hash_index_tag_collision_skip() { + let capacity = 16; + let hash1 = 0x7f00_0000_0000_0001; + let hash2 = 0x7f00_0000_0000_0002; + + let mut hash_index = NewHashIndex::with_capacity(capacity); + let mut adapter = TestTableAdapter::new(vec![(2, hash2)], vec![(1, hash1, 100)]); + let mut state = adapter.init_state(); + + adapter.init_hash_index(&mut hash_index); + + let count = + hash_index.probe_and_create(&mut state, adapter.incoming.len(), &mut adapter); + assert_eq!(1, count); + + let got = state.addresses[..state.row_count] + .iter() + .map(|row_ptr| { + let (key, _, value) = adapter.get_payload(*row_ptr); + (key, value) + }) + .collect::>(); + + let want = HashMap::from_iter([(2, 22)]); + assert_eq!(want, got); } } diff --git a/src/query/expression/src/aggregate/probe_state.rs b/src/query/expression/src/aggregate/probe_state.rs index 7b9b04243fb26..058a0e12c351e 100644 --- a/src/query/expression/src/aggregate/probe_state.rs +++ b/src/query/expression/src/aggregate/probe_state.rs @@ -28,6 +28,7 @@ pub struct ProbeState { pub(super) group_hashes: [u64; BATCH_SIZE], pub(super) addresses: [RowPtr; BATCH_SIZE], pub(super) page_index: [usize; BATCH_SIZE], + pub(super) probe_skip: [usize; BATCH_SIZE], pub(super) state_places: [StateAddr; BATCH_SIZE], pub(super) empty_vector: SelectVector, @@ -46,6 +47,7 @@ impl Default for ProbeState { group_hashes: [0; BATCH_SIZE], addresses: [RowPtr::null(); BATCH_SIZE], page_index: [0; BATCH_SIZE], + probe_skip: [0; BATCH_SIZE], state_places: [StateAddr::null(); BATCH_SIZE], group_compare_vector: [RowID::default(); BATCH_SIZE], no_match_vector: [RowID::default(); BATCH_SIZE], From caa2f045ccb6f1e599bd4b8a8a6fa63440fcaf4b Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Wed, 21 Jan 2026 10:55:10 +0800 Subject: [PATCH 06/25] add settings --- src/query/settings/src/settings_default.rs | 7 +++++++ src/query/settings/src/settings_getter_setter.rs | 4 ++++ 2 files changed, 11 insertions(+) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index cb1e9dced0070..5451de6baf767 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1521,6 +1521,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::String(vec!["auto".into(),"row".into(), "bucket".into()])), }), + ("enable_experiment_hash_index", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "experiment setting enable hash index(disable by default).", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ]); Ok(Arc::new(DefaultSettings { diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index c4f284d09b752..83344aa42237a 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -1147,4 +1147,8 @@ impl Settings { pub fn get_force_aggregate_shuffle_mode(&self) -> Result { self.try_get_string("force_aggregate_shuffle_mode") } + + pub fn get_enable_experiment_hash_index(&self) -> Result { + Ok(self.try_get_u64("enable_experiment_hash_index")? != 0) + } } From ae44965a2a8b3c220a57764e9678c74a1917dff0 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Wed, 21 Jan 2026 11:14:27 +0800 Subject: [PATCH 07/25] refactor hash index, unified create method --- .../src/aggregate/aggregate_hashtable.rs | 54 +++++++------------ .../expression/src/aggregate/hash_index.rs | 34 ++++++++++++ 2 files changed, 52 insertions(+), 36 deletions(-) diff --git a/src/query/expression/src/aggregate/aggregate_hashtable.rs b/src/query/expression/src/aggregate/aggregate_hashtable.rs index 19774a6d437a8..0e141ced6fe1e 100644 --- a/src/query/expression/src/aggregate/aggregate_hashtable.rs +++ b/src/query/expression/src/aggregate/aggregate_hashtable.rs @@ -21,7 +21,6 @@ use bumpalo::Bump; use databend_common_exception::Result; use super::BATCH_SIZE; -use super::Entry; use super::HashTableConfig; use super::LOAD_FACTOR; use super::MAX_PAGE_SIZE; @@ -94,10 +93,12 @@ impl AggregateHashTable { need_init_entry: bool, ) -> Self { debug_assert!(capacity.is_power_of_two()); - let entries = if need_init_entry { - vec![Entry::default(); capacity] + // if need_init_entry is false, we will directly append rows without probing hash index + // so we can use a dummy hash index, which is not allowed to insert any entry + let hash_index = if need_init_entry { + HashIndex::with_capacity(capacity) } else { - vec![] + HashIndex::dummy() }; Self { direct_append: !need_init_entry, @@ -108,12 +109,7 @@ impl AggregateHashTable { 1 << config.initial_radix_bits, vec![arena], ), - hash_index: HashIndex { - entries, - count: 0, - capacity, - capacity_mask: capacity - 1, - }, + hash_index, config, hash_index_resize_count: 0, } @@ -407,33 +403,19 @@ impl AggregateHashTable { } self.hash_index_resize_count += 1; - let mut hash_index = HashIndex::with_capacity(new_capacity); - - // iterate over payloads and copy to new entries - for payload in self.payload.payloads.iter() { - for page in payload.pages.iter() { - for idx in 0..page.rows { - let row_ptr = page.data_ptr(idx, payload.tuple_size); - let hash = row_ptr.hash(&payload.row_layout); - - let slot = hash_index.probe_slot(hash); - - // set value - let entry = hash_index.mut_entry(slot); - debug_assert!(!entry.is_occupied()); - entry.set_hash(hash); - entry.set_pointer(row_ptr); - - debug_assert!(entry.is_occupied()); - debug_assert_eq!(entry.get_pointer(), row_ptr); - debug_assert_eq!(entry.get_salt(), Entry::hash_to_salt(hash)); - - hash_index.count += 1; - } - } - } + let iter = self.payload.payloads.iter().flat_map(|payload| { + let row_layout = &payload.row_layout; + let tuple_size = payload.tuple_size; + payload.pages.iter().flat_map(move |page| { + (0..page.rows).map(move |idx| { + let row_ptr = page.data_ptr(idx, tuple_size); + let hash = row_ptr.hash(row_layout); + (hash, row_ptr) + }) + }) + }); - self.hash_index = hash_index + self.hash_index = HashIndex::rebuild_from_iter(new_capacity, iter); } fn initial_capacity() -> usize { diff --git a/src/query/expression/src/aggregate/hash_index.rs b/src/query/expression/src/aggregate/hash_index.rs index b8032982b828d..0ad4b98299bf3 100644 --- a/src/query/expression/src/aggregate/hash_index.rs +++ b/src/query/expression/src/aggregate/hash_index.rs @@ -52,6 +52,18 @@ fn init_slot(hash: u64, capacity_mask: usize) -> usize { } impl HashIndex { + /// Create a dummy HashIndex with zero capacity. + /// Any operation on this HashIndex is not allowed. + pub fn dummy() -> Self { + Self { + entries: vec![], + count: 0, + capacity: 0, + capacity_mask: 0, + } + } + + /// Create a HashIndex with the given capacity. pub fn with_capacity(capacity: usize) -> Self { debug_assert!(capacity.is_power_of_two()); let capacity_mask = capacity - 1; @@ -63,6 +75,28 @@ impl HashIndex { } } + pub(super) fn rebuild_from_iter(capacity: usize, iter: I) -> Self + where I: IntoIterator { + let mut hash_index = HashIndex::with_capacity(capacity); + + for (hash, row_ptr) in iter { + let slot = hash_index.probe_slot(hash); + + let entry = hash_index.mut_entry(slot); + debug_assert!(!entry.is_occupied()); + entry.set_hash(hash); + entry.set_pointer(row_ptr); + + debug_assert!(entry.is_occupied()); + debug_assert_eq!(entry.get_pointer(), row_ptr); + debug_assert_eq!(entry.get_salt(), Entry::hash_to_salt(hash)); + + hash_index.count += 1; + } + + hash_index + } + fn find_or_insert(&mut self, mut slot: usize, hash: u64) -> (usize, bool) { let salt = Entry::hash_to_salt(hash); let entries = self.entries.as_mut_slice(); From 551c94f63cf64ba9ff861fb4bb7b2ed2744aec4b Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Wed, 21 Jan 2026 11:53:29 +0800 Subject: [PATCH 08/25] use settings to switch hash index --- .../src/aggregate/aggregate_hashtable.rs | 33 +++++----- .../expression/src/aggregate/hash_index.rs | 47 ++++++++++++- src/query/expression/src/aggregate/mod.rs | 37 +++++++++++ .../src/aggregate/new_hash_index.rs | 66 ++++++++++++++++++- .../physical_aggregate_final.rs | 3 + .../physical_aggregate_partial.rs | 5 ++ .../pipelines/builders/builder_aggregate.rs | 2 + .../transforms/aggregator/aggregate_meta.rs | 7 +- .../aggregator/aggregator_params.rs | 3 + .../new_transform_final_aggregate.rs | 4 +- .../aggregator/serde/aggregate_scatter.rs | 2 + .../aggregator/transform_aggregate_final.rs | 8 ++- .../aggregator/transform_partition_bucket.rs | 1 + 13 files changed, 197 insertions(+), 21 deletions(-) diff --git a/src/query/expression/src/aggregate/aggregate_hashtable.rs b/src/query/expression/src/aggregate/aggregate_hashtable.rs index 0e141ced6fe1e..23bcf9d234f62 100644 --- a/src/query/expression/src/aggregate/aggregate_hashtable.rs +++ b/src/query/expression/src/aggregate/aggregate_hashtable.rs @@ -27,7 +27,7 @@ use super::MAX_PAGE_SIZE; use super::Payload; use super::group_hash_entries; use super::hash_index::AdapterImpl; -use super::hash_index::HashIndex; +use super::hash_index::HashIndexOps; use super::partitioned_payload::PartitionedPayload; use super::payload_flush::PayloadFlushState; use super::probe_state::ProbeState; @@ -44,7 +44,7 @@ pub struct AggregateHashTable { pub config: HashTableConfig, current_radix_bits: u64, - hash_index: HashIndex, + hash_index: Box, hash_index_resize_count: usize, } @@ -78,7 +78,7 @@ impl AggregateHashTable { 1 << config.initial_radix_bits, vec![arena], ), - hash_index: HashIndex::with_capacity(capacity), + hash_index: config.new_hash_index(capacity), config, hash_index_resize_count: 0, } @@ -96,9 +96,9 @@ impl AggregateHashTable { // if need_init_entry is false, we will directly append rows without probing hash index // so we can use a dummy hash index, which is not allowed to insert any entry let hash_index = if need_init_entry { - HashIndex::with_capacity(capacity) + config.new_hash_index(capacity) } else { - HashIndex::dummy() + config.new_dummy_hash_index() }; Self { direct_append: !need_init_entry, @@ -230,8 +230,8 @@ impl AggregateHashTable { if self.config.partial_agg { // check size - if self.hash_index.count + BATCH_SIZE > self.hash_index.resize_threshold() - && self.hash_index.capacity >= self.config.max_partial_capacity + if self.hash_index.count() + BATCH_SIZE > self.hash_index.resize_threshold() + && self.hash_index.capacity() >= self.config.max_partial_capacity { self.clear_ht(); } @@ -252,15 +252,16 @@ impl AggregateHashTable { row_count: usize, ) -> usize { // exceed capacity or should resize - if row_count + self.hash_index.count > self.hash_index.resize_threshold() { - self.resize(self.hash_index.capacity * 2); + if row_count + self.hash_index.count() > self.hash_index.resize_threshold() { + self.resize(self.hash_index.capacity() * 2); } + let mut adapter = AdapterImpl { + payload: &mut self.payload, + group_columns, + }; self.hash_index - .probe_and_create(state, row_count, AdapterImpl { - payload: &mut self.payload, - group_columns, - }) + .probe_and_create(state, row_count, &mut adapter) } pub fn combine(&mut self, other: Self, flush_state: &mut PayloadFlushState) -> Result<()> { @@ -394,11 +395,11 @@ impl AggregateHashTable { // scan payload to reconstruct PointArray fn resize(&mut self, new_capacity: usize) { if self.config.partial_agg { - if self.hash_index.capacity == self.config.max_partial_capacity { + if self.hash_index.capacity() == self.config.max_partial_capacity { return; } self.hash_index_resize_count += 1; - self.hash_index = HashIndex::with_capacity(new_capacity); + self.hash_index = self.config.new_hash_index(new_capacity); return; } @@ -415,7 +416,7 @@ impl AggregateHashTable { }) }); - self.hash_index = HashIndex::rebuild_from_iter(new_capacity, iter); + self.hash_index = self.config.rebuild_hash_index(new_capacity, iter); } fn initial_capacity() -> usize { diff --git a/src/query/expression/src/aggregate/hash_index.rs b/src/query/expression/src/aggregate/hash_index.rs index 0ad4b98299bf3..c6e45ec709dd0 100644 --- a/src/query/expression/src/aggregate/hash_index.rs +++ b/src/query/expression/src/aggregate/hash_index.rs @@ -210,12 +210,26 @@ pub(super) trait TableAdapter { ) -> usize; } +pub(super) trait HashIndexOps: Send + Sync { + fn capacity(&self) -> usize; + fn count(&self) -> usize; + fn resize_threshold(&self) -> usize; + fn allocated_bytes(&self) -> usize; + fn reset(&mut self); + fn probe_and_create( + &mut self, + state: &mut ProbeState, + row_count: usize, + adapter: &mut dyn TableAdapter, + ) -> usize; +} + impl HashIndex { pub fn probe_and_create( &mut self, state: &mut ProbeState, row_count: usize, - mut adapter: impl TableAdapter, + adapter: &mut dyn TableAdapter, ) -> usize { for (i, row) in state.no_match_vector[..row_count].iter_mut().enumerate() { *row = i.into(); @@ -290,6 +304,37 @@ impl HashIndex { } } +impl HashIndexOps for HashIndex { + fn capacity(&self) -> usize { + self.capacity + } + + fn count(&self) -> usize { + self.count + } + + fn resize_threshold(&self) -> usize { + HashIndex::resize_threshold(self) + } + + fn allocated_bytes(&self) -> usize { + HashIndex::allocated_bytes(self) + } + + fn reset(&mut self) { + HashIndex::reset(self) + } + + fn probe_and_create( + &mut self, + state: &mut ProbeState, + row_count: usize, + adapter: &mut dyn TableAdapter, + ) -> usize { + HashIndex::probe_and_create(self, state, row_count, adapter) + } +} + pub(super) struct AdapterImpl<'a> { pub payload: &'a mut PartitionedPayload, pub group_columns: ProjectedBlock<'a>, diff --git a/src/query/expression/src/aggregate/mod.rs b/src/query/expression/src/aggregate/mod.rs index a3dc02c3eb2ec..3767fc746bd00 100644 --- a/src/query/expression/src/aggregate/mod.rs +++ b/src/query/expression/src/aggregate/mod.rs @@ -37,6 +37,7 @@ pub use aggregate_function_state::*; pub use aggregate_hashtable::*; pub use group_hash::*; use hash_index::Entry; +use hash_index::HashIndexOps; pub use partitioned_payload::*; pub use payload::*; pub use payload_flush::*; @@ -69,6 +70,7 @@ pub struct HashTableConfig { pub block_fill_factor: f64, pub partial_agg: bool, pub max_partial_capacity: usize, + pub enable_experiment_hash_index: bool, } impl Default for HashTableConfig { @@ -81,6 +83,7 @@ impl Default for HashTableConfig { block_fill_factor: 1.8, partial_agg: false, max_partial_capacity: 131072, + enable_experiment_hash_index: false, } } } @@ -119,6 +122,11 @@ impl HashTableConfig { self } + pub fn with_experiment_hash_index(mut self, enable: bool) -> Self { + self.enable_experiment_hash_index = enable; + self + } + pub fn with_partial(mut self, partial_agg: bool, active_threads: usize) -> Self { self.partial_agg = partial_agg; @@ -160,4 +168,33 @@ impl HashTableConfig { break; } } + + fn new_hash_index(&self, capacity: usize) -> Box { + if self.enable_experiment_hash_index { + Box::new(new_hash_index::NewHashIndex::with_capacity(capacity)) + } else { + Box::new(hash_index::HashIndex::with_capacity(capacity)) + } + } + + fn new_dummy_hash_index(&self) -> Box { + if self.enable_experiment_hash_index { + Box::new(new_hash_index::NewHashIndex::dummy()) + } else { + Box::new(hash_index::HashIndex::dummy()) + } + } + + fn rebuild_hash_index( + &self, + capacity: usize, + iter: I, + ) -> Box + where I: IntoIterator { + if self.enable_experiment_hash_index { + Box::new(new_hash_index::NewHashIndex::rebuild_from_iter(capacity, iter)) + } else { + Box::new(hash_index::HashIndex::rebuild_from_iter(capacity, iter)) + } + } } diff --git a/src/query/expression/src/aggregate/new_hash_index.rs b/src/query/expression/src/aggregate/new_hash_index.rs index bf50678fec1df..73c4e6644d809 100644 --- a/src/query/expression/src/aggregate/new_hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index.rs @@ -21,6 +21,7 @@ use databend_common_ast::parser::token::GROUP; use crate::ProbeState; use crate::aggregate::BATCH_SIZE; +use crate::aggregate::hash_index::HashIndexOps; use crate::aggregate::hash_index::TableAdapter; use crate::aggregate::row_ptr::RowPtr; @@ -163,6 +164,7 @@ pub struct NewHashIndex { pointers: Vec, capacity: usize, bucket_mask: usize, + count: usize, } impl NewHashIndex { @@ -179,8 +181,31 @@ impl NewHashIndex { pointers, capacity, bucket_mask, + count: 0, } } + + pub fn dummy() -> Self { + Self { + ctrls: vec![], + pointers: vec![], + capacity: 0, + bucket_mask: 0, + count: 0, + } + } + + pub fn rebuild_from_iter(capacity: usize, iter: I) -> Self + where I: IntoIterator { + let mut hash_index = NewHashIndex::with_capacity(capacity); + for (hash, row_ptr) in iter { + let slot = hash_index.probe_slot(hash); + hash_index.set_ctrl(slot, Tag::full(hash)); + hash_index.pointers[slot] = row_ptr; + hash_index.count += 1; + } + hash_index + } } impl NewHashIndex { @@ -257,8 +282,9 @@ impl NewHashIndex { &mut self, state: &mut ProbeState, row_count: usize, - mut adapter: impl TableAdapter, + adapter: &mut dyn TableAdapter, ) -> usize { + debug_assert!(self.capacity > 0); for (i, row) in state.no_match_vector[..row_count].iter_mut().enumerate() { *row = i.into(); state.probe_skip[i] = 0; @@ -317,10 +343,48 @@ impl NewHashIndex { remaining_entries = no_match_count; } + self.count += new_group_count; new_group_count } } +impl HashIndexOps for NewHashIndex { + fn capacity(&self) -> usize { + self.capacity + } + + fn count(&self) -> usize { + self.count + } + + fn resize_threshold(&self) -> usize { + (self.capacity as f64 / super::LOAD_FACTOR) as usize + } + + fn allocated_bytes(&self) -> usize { + self.ctrls.len() * std::mem::size_of::() + + self.pointers.len() * std::mem::size_of::() + } + + fn reset(&mut self) { + if self.capacity == 0 { + return; + } + self.count = 0; + self.ctrls.fill(Tag::EMPTY); + self.pointers.fill(RowPtr::null()); + } + + fn probe_and_create( + &mut self, + state: &mut ProbeState, + row_count: usize, + adapter: &mut dyn TableAdapter, + ) -> usize { + NewHashIndex::probe_and_create(self, state, row_count, adapter) + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/src/query/service/src/physical_plans/physical_aggregate_final.rs b/src/query/service/src/physical_plans/physical_aggregate_final.rs index a8af4228a15df..207c6a4af43e7 100644 --- a/src/query/service/src/physical_plans/physical_aggregate_final.rs +++ b/src/query/service/src/physical_plans/physical_aggregate_final.rs @@ -153,6 +153,8 @@ impl IPhysicalPlan for AggregateFinal { let max_spill_io_requests = builder.settings.get_max_spill_io_requests()?; let max_restore_worker = builder.settings.get_max_aggregate_restore_worker()?; let enable_experiment_aggregate = builder.settings.get_enable_experiment_aggregate()?; + let enable_experiment_hash_index = + builder.settings.get_enable_experiment_hash_index()?; let mut is_cluster_aggregate = false; if ExchangeSource::check_physical_plan(&self.input) { @@ -166,6 +168,7 @@ impl IPhysicalPlan for AggregateFinal { is_cluster_aggregate, max_spill_io_requests as usize, enable_experiment_aggregate, + enable_experiment_hash_index, max_block_rows, max_block_bytes, )?; diff --git a/src/query/service/src/physical_plans/physical_aggregate_partial.rs b/src/query/service/src/physical_plans/physical_aggregate_partial.rs index 959fc38faa9ef..a9a20fa6e4995 100644 --- a/src/query/service/src/physical_plans/physical_aggregate_partial.rs +++ b/src/query/service/src/physical_plans/physical_aggregate_partial.rs @@ -179,6 +179,8 @@ impl IPhysicalPlan for AggregatePartial { let max_threads = builder.settings.get_max_threads()?; let max_spill_io_requests = builder.settings.get_max_spill_io_requests()?; let enable_experiment_aggregate = builder.settings.get_enable_experiment_aggregate()?; + let enable_experiment_hash_index = + builder.settings.get_enable_experiment_hash_index()?; let cluster = &builder.ctx.get_cluster(); let params = PipelineBuilder::build_aggregator_params( @@ -188,6 +190,7 @@ impl IPhysicalPlan for AggregatePartial { builder.is_exchange_parent(), max_spill_io_requests as usize, enable_experiment_aggregate, + enable_experiment_hash_index, max_block_rows, max_block_bytes, )?; @@ -216,6 +219,8 @@ impl IPhysicalPlan for AggregatePartial { .cluster_with_partial(true, builder.ctx.get_cluster().nodes.len()) } }; + let partial_agg_config = + partial_agg_config.with_experiment_hash_index(enable_experiment_hash_index); // For rank limit, we can filter data using sort with rank before partial. if let Some((sort_desc, limit)) = diff --git a/src/query/service/src/pipelines/builders/builder_aggregate.rs b/src/query/service/src/pipelines/builders/builder_aggregate.rs index 1a55e06b6aa24..899451f1eb083 100644 --- a/src/query/service/src/pipelines/builders/builder_aggregate.rs +++ b/src/query/service/src/pipelines/builders/builder_aggregate.rs @@ -37,6 +37,7 @@ impl PipelineBuilder { cluster_aggregator: bool, max_spill_io_requests: usize, enable_experiment_aggregate: bool, + enable_experiment_hash_index: bool, max_block_rows: usize, max_block_bytes: usize, ) -> Result> { @@ -132,6 +133,7 @@ impl PipelineBuilder { cluster_aggregator, max_spill_io_requests, enable_experiment_aggregate, + enable_experiment_hash_index, max_block_rows, max_block_bytes, )?; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs index 90f91d8734d55..7fba78d8892bb 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs @@ -53,12 +53,15 @@ impl SerializedPayload { aggrs: Vec>, num_states: usize, radix_bits: u64, + enable_experiment_hash_index: bool, arena: Arc, need_init_entry: bool, ) -> Result { let rows_num = self.data_block.num_rows(); let capacity = AggregateHashTable::get_capacity_for_count(rows_num); - let config = HashTableConfig::default().with_initial_radix_bits(radix_bits); + let config = HashTableConfig::default() + .with_initial_radix_bits(radix_bits) + .with_experiment_hash_index(enable_experiment_hash_index); let mut state = ProbeState::default(); let group_len = group_types.len(); let mut hashtable = AggregateHashTable::new_directly( @@ -94,6 +97,7 @@ impl SerializedPayload { aggrs: Vec>, num_states: usize, radix_bits: u64, + enable_experiment_hash_index: bool, arena: Arc, ) -> Result { let hashtable = self.convert_to_aggregate_table( @@ -101,6 +105,7 @@ impl SerializedPayload { aggrs, num_states, radix_bits, + enable_experiment_hash_index, arena, false, )?; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs index d615f3c05afc5..ace33e12f1ecf 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs @@ -41,6 +41,7 @@ pub struct AggregatorParams { pub max_spill_io_requests: usize, pub enable_experiment_aggregate: bool, + pub enable_experiment_hash_index: bool, pub max_block_rows: usize, pub max_block_bytes: usize, @@ -57,6 +58,7 @@ impl AggregatorParams { cluster_aggregator: bool, max_spill_io_requests: usize, enable_experiment_aggregate: bool, + enable_experiment_hash_index: bool, max_block_rows: usize, max_block_bytes: usize, ) -> Result> { @@ -78,6 +80,7 @@ impl AggregatorParams { max_block_bytes, max_spill_io_requests, enable_experiment_aggregate, + enable_experiment_hash_index, })) } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs index 81fe71cc24448..611854509b5a1 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs @@ -96,7 +96,8 @@ impl NewTransformFinalAggregate { params.group_data_types.clone(), params.aggregate_functions.clone(), HashTableConfig::default() - .with_initial_radix_bits(SPILL_BUCKET_NUM.trailing_zeros() as u64), + .with_initial_radix_bits(SPILL_BUCKET_NUM.trailing_zeros() as u64) + .with_experiment_hash_index(params.enable_experiment_hash_index), Arc::new(Bump::new()), ); let flush_state = PayloadFlushState::default(); @@ -148,6 +149,7 @@ impl NewTransformFinalAggregate { self.params.aggregate_functions.clone(), self.params.num_states(), 0, + self.params.enable_experiment_hash_index, Arc::new(Bump::new()), )?; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/aggregate_scatter.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/aggregate_scatter.rs index c94b8a1c86f39..8f6e03dde1033 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/aggregate_scatter.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/aggregate_scatter.rs @@ -177,6 +177,7 @@ impl AggregateRowScatter { self.aggregate_params.aggregate_functions.clone(), self.aggregate_params.num_states(), 0, + self.aggregate_params.enable_experiment_hash_index, Arc::new(Bump::new()), )?; let payload = partition.payloads.pop(); @@ -220,6 +221,7 @@ impl AggregateRowScatter { self.aggregate_params.aggregate_functions.clone(), self.aggregate_params.num_states(), 0, + self.aggregate_params.enable_experiment_hash_index, Arc::new(Bump::new()), )?; let payload = partition.payloads.pop(); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs index e095bc3f6b17d..7e40b46f5828f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs @@ -66,6 +66,7 @@ impl TransformFinalAggregate { self.params.aggregate_functions.clone(), self.params.num_states(), 0, + self.params.enable_experiment_hash_index, Arc::new(Bump::new()), )?; ht.combine_payloads(&payload, &mut self.flush_state)?; @@ -77,6 +78,7 @@ impl TransformFinalAggregate { self.params.aggregate_functions.clone(), self.params.num_states(), 0, + self.params.enable_experiment_hash_index, Arc::new(Bump::new()), true, )?); @@ -94,7 +96,11 @@ impl TransformFinalAggregate { let mut hashtable = AggregateHashTable::new_with_capacity( self.params.group_data_types.clone(), self.params.aggregate_functions.clone(), - HashTableConfig::default().with_initial_radix_bits(0), + HashTableConfig::default() + .with_initial_radix_bits(0) + .with_experiment_hash_index( + self.params.enable_experiment_hash_index, + ), capacity, Arc::new(Bump::new()), ); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs index e0d7b5cbee7d0..11b698f7b6104 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs @@ -362,6 +362,7 @@ impl TransformPartitionBucket { self.params.aggregate_functions.clone(), self.params.num_states(), 0, + self.params.enable_experiment_hash_index, Arc::new(Bump::new()), )?; From 5a6ea7857e6587edd515a5220a1dcc985be94830 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Wed, 21 Jan 2026 13:27:11 +0800 Subject: [PATCH 09/25] save --- src/query/expression/src/aggregate/hash_index.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/expression/src/aggregate/hash_index.rs b/src/query/expression/src/aggregate/hash_index.rs index c6e45ec709dd0..1104bc23ee36d 100644 --- a/src/query/expression/src/aggregate/hash_index.rs +++ b/src/query/expression/src/aggregate/hash_index.rs @@ -210,7 +210,7 @@ pub(super) trait TableAdapter { ) -> usize; } -pub(super) trait HashIndexOps: Send + Sync { +pub(super) trait HashIndexOps { fn capacity(&self) -> usize; fn count(&self) -> usize; fn resize_threshold(&self) -> usize; From e8ee9e734be0732d4d37241e51aba2327e23a603 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Wed, 21 Jan 2026 14:38:43 +0800 Subject: [PATCH 10/25] fmt --- .../expression/src/aggregate/hash_index.rs | 2 +- src/query/expression/src/aggregate/mod.rs | 12 +++---- .../src/aggregate/new_hash_index.rs | 34 ++++++++++++------- .../physical_aggregate_final.rs | 3 +- .../physical_aggregate_partial.rs | 3 +- 5 files changed, 30 insertions(+), 24 deletions(-) diff --git a/src/query/expression/src/aggregate/hash_index.rs b/src/query/expression/src/aggregate/hash_index.rs index 1104bc23ee36d..0959d693ec59e 100644 --- a/src/query/expression/src/aggregate/hash_index.rs +++ b/src/query/expression/src/aggregate/hash_index.rs @@ -430,7 +430,7 @@ mod tests { } } - impl TableAdapter for &mut TestTableAdapter { + impl TableAdapter for TestTableAdapter { fn append_rows(&mut self, state: &mut ProbeState, new_entry_count: usize) { for row in state.empty_vector[..new_entry_count].iter() { let (key, hash) = self.incoming[*row]; diff --git a/src/query/expression/src/aggregate/mod.rs b/src/query/expression/src/aggregate/mod.rs index 3767fc746bd00..6087a2d2c7229 100644 --- a/src/query/expression/src/aggregate/mod.rs +++ b/src/query/expression/src/aggregate/mod.rs @@ -28,9 +28,9 @@ mod payload_row; mod probe_state; mod row_ptr; +use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; -use std::sync::Arc; pub use aggregate_function::*; pub use aggregate_function_state::*; @@ -185,14 +185,12 @@ impl HashTableConfig { } } - fn rebuild_hash_index( - &self, - capacity: usize, - iter: I, - ) -> Box + fn rebuild_hash_index(&self, capacity: usize, iter: I) -> Box where I: IntoIterator { if self.enable_experiment_hash_index { - Box::new(new_hash_index::NewHashIndex::rebuild_from_iter(capacity, iter)) + Box::new(new_hash_index::NewHashIndex::rebuild_from_iter( + capacity, iter, + )) } else { Box::new(hash_index::HashIndex::rebuild_from_iter(capacity, iter)) } diff --git a/src/query/expression/src/aggregate/new_hash_index.rs b/src/query/expression/src/aggregate/new_hash_index.rs index 73c4e6644d809..ba867f3b2286b 100644 --- a/src/query/expression/src/aggregate/new_hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index.rs @@ -25,8 +25,7 @@ use crate::aggregate::hash_index::HashIndexOps; use crate::aggregate::hash_index::TableAdapter; use crate::aggregate::row_ptr::RowPtr; -/// Portions of this file are derived from excellent `hashbrown` crate: -/// https://github.com/rust-lang/hashbrown/ +// Portions of this file are derived from excellent `hashbrown` crate /// Single tag in a control group. #[derive(Copy, Clone, PartialEq, Eq)] @@ -72,11 +71,7 @@ impl BitMask { } fn lowest_set_bit(self) -> Option { - if let Some(nonzero) = NonZeroBitMaskWord::new(self.0) { - Some(Self::nonzero_trailing_zeros(nonzero)) - } else { - None - } + NonZeroBitMaskWord::new(self.0).map(Self::nonzero_trailing_zeros) } } @@ -133,7 +128,7 @@ impl Group { BitMask((self.0 & repeat(Tag(0x80))).to_le()) } - unsafe fn load(ctrls: &Vec, index: usize) -> Self { + unsafe fn load(ctrls: &[Tag], index: usize) -> Self { unsafe { Group((ctrls.as_ptr().add(index) as *const u64).read_unaligned()) } } } @@ -300,10 +295,12 @@ impl NewHashIndex { for row in state.no_match_vector[..remaining_entries].iter().copied() { let skip = state.probe_skip[row]; - let (slot, is_new) = self.find_or_insert(state.group_hashes[row], skip); + let hash = state.group_hashes[row]; + let (slot, is_new) = self.find_or_insert(hash, skip); state.slots[row] = slot; if is_new { + self.set_ctrl(slot, Tag::full(hash)); state.empty_vector[new_entry_count] = row; new_entry_count += 1; } else { @@ -444,7 +441,7 @@ mod tests { } } - impl TableAdapter for &mut TestTableAdapter { + impl TableAdapter for TestTableAdapter { fn append_rows(&mut self, state: &mut ProbeState, new_entry_count: usize) { for row in state.empty_vector[..new_entry_count].iter() { let (key, hash) = self.incoming[*row]; @@ -491,8 +488,7 @@ mod tests { adapter.init_hash_index(&mut hash_index); - let count = - hash_index.probe_and_create(&mut state, adapter.incoming.len(), &mut adapter); + let count = hash_index.probe_and_create(&mut state, adapter.incoming.len(), &mut adapter); assert_eq!(1, count); let got = state.addresses[..state.row_count] @@ -506,4 +502,18 @@ mod tests { let want = HashMap::from_iter([(2, 22)]); assert_eq!(want, got); } + + #[test] + fn test_new_hash_index_batch_dedup() { + let capacity = 16; + let hash = 0x1234_5678_9abc_def0; + + let mut hash_index = NewHashIndex::with_capacity(capacity); + let mut adapter = TestTableAdapter::new(vec![(1, hash), (1, hash), (1, hash)], vec![]); + let mut state = adapter.init_state(); + + let count = hash_index.probe_and_create(&mut state, adapter.incoming.len(), &mut adapter); + + assert_eq!(1, count); + } } diff --git a/src/query/service/src/physical_plans/physical_aggregate_final.rs b/src/query/service/src/physical_plans/physical_aggregate_final.rs index 207c6a4af43e7..dd3bb30224f97 100644 --- a/src/query/service/src/physical_plans/physical_aggregate_final.rs +++ b/src/query/service/src/physical_plans/physical_aggregate_final.rs @@ -153,8 +153,7 @@ impl IPhysicalPlan for AggregateFinal { let max_spill_io_requests = builder.settings.get_max_spill_io_requests()?; let max_restore_worker = builder.settings.get_max_aggregate_restore_worker()?; let enable_experiment_aggregate = builder.settings.get_enable_experiment_aggregate()?; - let enable_experiment_hash_index = - builder.settings.get_enable_experiment_hash_index()?; + let enable_experiment_hash_index = builder.settings.get_enable_experiment_hash_index()?; let mut is_cluster_aggregate = false; if ExchangeSource::check_physical_plan(&self.input) { diff --git a/src/query/service/src/physical_plans/physical_aggregate_partial.rs b/src/query/service/src/physical_plans/physical_aggregate_partial.rs index a9a20fa6e4995..bb93b3344fc69 100644 --- a/src/query/service/src/physical_plans/physical_aggregate_partial.rs +++ b/src/query/service/src/physical_plans/physical_aggregate_partial.rs @@ -179,8 +179,7 @@ impl IPhysicalPlan for AggregatePartial { let max_threads = builder.settings.get_max_threads()?; let max_spill_io_requests = builder.settings.get_max_spill_io_requests()?; let enable_experiment_aggregate = builder.settings.get_enable_experiment_aggregate()?; - let enable_experiment_hash_index = - builder.settings.get_enable_experiment_hash_index()?; + let enable_experiment_hash_index = builder.settings.get_enable_experiment_hash_index()?; let cluster = &builder.ctx.get_cluster(); let params = PipelineBuilder::build_aggregator_params( From 7c5b7c83193f907f69ccfab252a879bbbb763730 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Wed, 21 Jan 2026 15:23:45 +0800 Subject: [PATCH 11/25] enable for test --- src/query/settings/src/settings_default.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 5451de6baf767..162332bd4db01 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1522,7 +1522,7 @@ impl DefaultSettings { range: Some(SettingRange::String(vec!["auto".into(),"row".into(), "bucket".into()])), }), ("enable_experiment_hash_index", DefaultSettingValue { - value: UserSettingValue::UInt64(0), + value: UserSettingValue::UInt64(1), desc: "experiment setting enable hash index(disable by default).", mode: SettingMode::Both, scope: SettingScope::Both, From 410402d03f3650765241fe9649c549d599ae455d Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Wed, 21 Jan 2026 17:44:55 +0800 Subject: [PATCH 12/25] add simd impl for aarch64 --- .../src/aggregate/new_hash_index.rs | 106 ++++++++++++++---- 1 file changed, 87 insertions(+), 19 deletions(-) diff --git a/src/query/expression/src/aggregate/new_hash_index.rs b/src/query/expression/src/aggregate/new_hash_index.rs index ba867f3b2286b..d5cef7e88ccfd 100644 --- a/src/query/expression/src/aggregate/new_hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index.rs @@ -23,6 +23,7 @@ use crate::ProbeState; use crate::aggregate::BATCH_SIZE; use crate::aggregate::hash_index::HashIndexOps; use crate::aggregate::hash_index::TableAdapter; +use crate::aggregate::new_hash_index::group::Group; use crate::aggregate::row_ptr::RowPtr; // Portions of this file are derived from excellent `hashbrown` crate @@ -109,27 +110,94 @@ fn repeat(tag: Tag) -> u64 { u64::from_ne_bytes([tag.0; Group::WIDTH]) } -#[derive(Copy, Clone)] -struct Group(u64); +pub(crate) mod group { + + #[cfg(not(all( + target_arch = "aarch64", + target_feature = "neon", + // NEON intrinsics are currently broken on big-endian targets. + // See https://github.com/rust-lang/stdarch/issues/1484. + target_endian = "little", + not(miri), + )))] + pub(crate) use generic::Group; + #[cfg(all( + target_arch = "aarch64", + target_feature = "neon", + // NEON intrinsics are currently broken on big-endian targets. + // See https://github.com/rust-lang/stdarch/issues/1484. + target_endian = "little", + not(miri), + ))] + pub(crate) use neon::Group; + + mod generic { + use crate::aggregate::new_hash_index::BitMask; + use crate::aggregate::new_hash_index::Tag; + use crate::aggregate::new_hash_index::repeat; + + #[derive(Copy, Clone)] + pub(crate) struct Group(u64); + + impl Group { + /// Number of bytes in the group. + pub(crate) const WIDTH: usize = 8; + + #[inline] + pub(crate) fn match_tag(self, tag: Tag) -> BitMask { + // This algorithm is derived from + // https://graphics.stanford.edu/~seander/bithacks.html##ValueInWord + let cmp = self.0 ^ repeat(tag); + BitMask((cmp.wrapping_sub(repeat(Tag(0x01))) & !cmp & repeat(Tag(0x80))).to_le()) + } -impl Group { - /// Number of bytes in the group. - const WIDTH: usize = 8; + #[inline] + pub(crate) fn match_empty(self) -> BitMask { + BitMask((self.0 & repeat(Tag(0x80))).to_le()) + } - fn match_tag(self, tag: Tag) -> BitMask { - // This algorithm is derived from - // https://graphics.stanford.edu/~seander/bithacks.html##ValueInWord - let cmp = self.0 ^ repeat(tag); - BitMask((cmp.wrapping_sub(repeat(Tag(0x01))) & !cmp & repeat(Tag(0x80))).to_le()) + #[inline] + pub(crate) unsafe fn load(ctrls: &[Tag], index: usize) -> Self { + unsafe { Group((ctrls.as_ptr().add(index) as *const u64).read_unaligned()) } + } + } } - #[inline] - pub(crate) fn match_empty(self) -> BitMask { - BitMask((self.0 & repeat(Tag(0x80))).to_le()) - } + mod neon { + use core::arch::aarch64 as neon; + use std::mem; + + use crate::aggregate::new_hash_index::BitMask; + use crate::aggregate::new_hash_index::Tag; + + #[derive(Copy, Clone)] + pub(crate) struct Group(neon::uint8x8_t); - unsafe fn load(ctrls: &[Tag], index: usize) -> Self { - unsafe { Group((ctrls.as_ptr().add(index) as *const u64).read_unaligned()) } + impl Group { + /// Number of bytes in the group. + pub(crate) const WIDTH: usize = mem::size_of::(); + + #[inline] + pub(crate) fn match_tag(self, tag: Tag) -> BitMask { + unsafe { + let cmp = neon::vceq_u8(self.0, neon::vdup_n_u8(tag.0)); + BitMask(neon::vget_lane_u64(neon::vreinterpret_u64_u8(cmp), 0)) + } + } + + #[inline] + pub(crate) fn match_empty(self) -> BitMask { + unsafe { + let cmp = neon::vcltz_s8(neon::vreinterpret_s8_u8(self.0)); + BitMask(neon::vget_lane_u64(neon::vreinterpret_u64_u8(cmp), 0)) + } + } + + #[inline] + pub(crate) unsafe fn load(ctrls: &[Tag], index: usize) -> Self { + unsafe { Group(neon::vld1_u8(ctrls.as_ptr().add(index) as *const u8)) } + } + } } } @@ -194,8 +262,7 @@ impl NewHashIndex { where I: IntoIterator { let mut hash_index = NewHashIndex::with_capacity(capacity); for (hash, row_ptr) in iter { - let slot = hash_index.probe_slot(hash); - hash_index.set_ctrl(slot, Tag::full(hash)); + let slot = hash_index.probe_empty_and_set_ctrl(hash); hash_index.pointers[slot] = row_ptr; hash_index.count += 1; } @@ -262,11 +329,12 @@ impl NewHashIndex { } } - pub fn probe_slot(&mut self, hash: u64) -> usize { + pub fn probe_empty_and_set_ctrl(&mut self, hash: u64) -> usize { let mut probe_seq = self.probe_seq(hash); loop { let group = unsafe { Group::load(&self.ctrls, probe_seq.pos) }; if let Some(index) = self.find_insert_index_in_group(&group, &probe_seq.pos) { + self.set_ctrl(index, Tag::full(hash)); return index; } probe_seq.move_next(self.bucket_mask); From da5cb152487bd3a79133b30743189ec0bd10da90 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Wed, 21 Jan 2026 21:21:09 +0800 Subject: [PATCH 13/25] fix wrong result from salt collision --- .../src/aggregate/new_hash_index.rs | 44 ++++++++----------- .../expression/src/aggregate/probe_state.rs | 2 - 2 files changed, 19 insertions(+), 27 deletions(-) diff --git a/src/query/expression/src/aggregate/new_hash_index.rs b/src/query/expression/src/aggregate/new_hash_index.rs index d5cef7e88ccfd..1e6f5e784eb61 100644 --- a/src/query/expression/src/aggregate/new_hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index.rs @@ -290,11 +290,8 @@ impl NewHashIndex { } #[inline] - fn probe_seq(&self, hash: u64) -> ProbeSeq { - ProbeSeq { - pos: hash as usize & self.bucket_mask, - stride: 0, - } + fn h1(&self, hash: u64) -> usize { + hash as usize & self.bucket_mask } #[inline] @@ -308,36 +305,34 @@ impl NewHashIndex { } } - pub fn find_or_insert(&mut self, hash: u64, mut skip: usize) -> (usize, bool) { + pub fn find_or_insert(&mut self, mut pos: usize, hash: u64) -> (usize, bool) { let mut insert_index = None; let tag_hash = Tag::full(hash); - let mut probe_seq = self.probe_seq(hash); loop { - let group = unsafe { Group::load(&self.ctrls, probe_seq.pos) }; + let group = unsafe { Group::load(&self.ctrls, pos) }; for bit in group.match_tag(tag_hash) { - let index = (probe_seq.pos + bit) & self.bucket_mask; - if likely(skip == 0) { - return (index, false); - } - skip -= 1; + let index = (pos + bit) & self.bucket_mask; + return (index, false); } - insert_index = self.find_insert_index_in_group(&group, &probe_seq.pos); + insert_index = self.find_insert_index_in_group(&group, &pos); if let Some(index) = insert_index { + self.set_ctrl(index, tag_hash); return (index, true); } - probe_seq.move_next(self.bucket_mask); + + pos = (pos + Group::WIDTH) & self.bucket_mask; } } pub fn probe_empty_and_set_ctrl(&mut self, hash: u64) -> usize { - let mut probe_seq = self.probe_seq(hash); + let mut pos = self.h1(hash); loop { - let group = unsafe { Group::load(&self.ctrls, probe_seq.pos) }; - if let Some(index) = self.find_insert_index_in_group(&group, &probe_seq.pos) { + let group = unsafe { Group::load(&self.ctrls, pos) }; + if let Some(index) = self.find_insert_index_in_group(&group, &pos) { self.set_ctrl(index, Tag::full(hash)); return index; } - probe_seq.move_next(self.bucket_mask); + pos = (pos + Group::WIDTH) & self.bucket_mask; } } @@ -350,7 +345,7 @@ impl NewHashIndex { debug_assert!(self.capacity > 0); for (i, row) in state.no_match_vector[..row_count].iter_mut().enumerate() { *row = i.into(); - state.probe_skip[i] = 0; + state.slots[i] = self.h1(state.group_hashes[i]); } let mut new_group_count = 0; @@ -362,13 +357,11 @@ impl NewHashIndex { let mut no_match_count = 0; for row in state.no_match_vector[..remaining_entries].iter().copied() { - let skip = state.probe_skip[row]; - let hash = state.group_hashes[row]; - let (slot, is_new) = self.find_or_insert(hash, skip); + let slot = &mut state.slots[row]; + let (slot, is_new) = self.find_or_insert(*slot, state.group_hashes[row]); state.slots[row] = slot; if is_new { - self.set_ctrl(slot, Tag::full(hash)); state.empty_vector[new_entry_count] = row; new_entry_count += 1; } else { @@ -402,7 +395,8 @@ impl NewHashIndex { } for row in state.no_match_vector[..no_match_count].iter().copied() { - state.probe_skip[row] += 1; + let slot = state.slots[row]; + state.slots[row] = (slot + 1) & self.bucket_mask; } remaining_entries = no_match_count; diff --git a/src/query/expression/src/aggregate/probe_state.rs b/src/query/expression/src/aggregate/probe_state.rs index 058a0e12c351e..7b9b04243fb26 100644 --- a/src/query/expression/src/aggregate/probe_state.rs +++ b/src/query/expression/src/aggregate/probe_state.rs @@ -28,7 +28,6 @@ pub struct ProbeState { pub(super) group_hashes: [u64; BATCH_SIZE], pub(super) addresses: [RowPtr; BATCH_SIZE], pub(super) page_index: [usize; BATCH_SIZE], - pub(super) probe_skip: [usize; BATCH_SIZE], pub(super) state_places: [StateAddr; BATCH_SIZE], pub(super) empty_vector: SelectVector, @@ -47,7 +46,6 @@ impl Default for ProbeState { group_hashes: [0; BATCH_SIZE], addresses: [RowPtr::null(); BATCH_SIZE], page_index: [0; BATCH_SIZE], - probe_skip: [0; BATCH_SIZE], state_places: [StateAddr::null(); BATCH_SIZE], group_compare_vector: [RowID::default(); BATCH_SIZE], no_match_vector: [RowID::default(); BATCH_SIZE], From 0f3dbf25832bf747a2c394eba2a1ab271b03bdcc Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Wed, 21 Jan 2026 22:04:11 +0800 Subject: [PATCH 14/25] fix: change visibility of Group struct and its methods in new_hash_index.rs --- .../src/aggregate/new_hash_index.rs | 164 ++---------------- 1 file changed, 13 insertions(+), 151 deletions(-) diff --git a/src/query/expression/src/aggregate/new_hash_index.rs b/src/query/expression/src/aggregate/new_hash_index.rs index 1e6f5e784eb61..397ceb3abd74c 100644 --- a/src/query/expression/src/aggregate/new_hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index.rs @@ -110,17 +110,15 @@ fn repeat(tag: Tag) -> u64 { u64::from_ne_bytes([tag.0; Group::WIDTH]) } -pub(crate) mod group { +mod group { #[cfg(not(all( target_arch = "aarch64", target_feature = "neon", - // NEON intrinsics are currently broken on big-endian targets. - // See https://github.com/rust-lang/stdarch/issues/1484. target_endian = "little", not(miri), )))] - pub(crate) use generic::Group; + pub use generic::Group; #[cfg(all( target_arch = "aarch64", target_feature = "neon", @@ -129,7 +127,7 @@ pub(crate) mod group { target_endian = "little", not(miri), ))] - pub(crate) use neon::Group; + pub use neon::Group; mod generic { use crate::aggregate::new_hash_index::BitMask; @@ -137,14 +135,14 @@ pub(crate) mod group { use crate::aggregate::new_hash_index::repeat; #[derive(Copy, Clone)] - pub(crate) struct Group(u64); + pub struct Group(u64); impl Group { /// Number of bytes in the group. - pub(crate) const WIDTH: usize = 8; + pub const WIDTH: usize = 8; #[inline] - pub(crate) fn match_tag(self, tag: Tag) -> BitMask { + pub fn match_tag(self, tag: Tag) -> BitMask { // This algorithm is derived from // https://graphics.stanford.edu/~seander/bithacks.html##ValueInWord let cmp = self.0 ^ repeat(tag); @@ -152,12 +150,12 @@ pub(crate) mod group { } #[inline] - pub(crate) fn match_empty(self) -> BitMask { + pub fn match_empty(self) -> BitMask { BitMask((self.0 & repeat(Tag(0x80))).to_le()) } #[inline] - pub(crate) unsafe fn load(ctrls: &[Tag], index: usize) -> Self { + pub unsafe fn load(ctrls: &[Tag], index: usize) -> Self { unsafe { Group((ctrls.as_ptr().add(index) as *const u64).read_unaligned()) } } } @@ -171,14 +169,14 @@ pub(crate) mod group { use crate::aggregate::new_hash_index::Tag; #[derive(Copy, Clone)] - pub(crate) struct Group(neon::uint8x8_t); + pub struct Group(neon::uint8x8_t); impl Group { /// Number of bytes in the group. - pub(crate) const WIDTH: usize = mem::size_of::(); + pub const WIDTH: usize = mem::size_of::(); #[inline] - pub(crate) fn match_tag(self, tag: Tag) -> BitMask { + pub fn match_tag(self, tag: Tag) -> BitMask { unsafe { let cmp = neon::vceq_u8(self.0, neon::vdup_n_u8(tag.0)); BitMask(neon::vget_lane_u64(neon::vreinterpret_u64_u8(cmp), 0)) @@ -186,7 +184,7 @@ pub(crate) mod group { } #[inline] - pub(crate) fn match_empty(self) -> BitMask { + pub fn match_empty(self) -> BitMask { unsafe { let cmp = neon::vcltz_s8(neon::vreinterpret_s8_u8(self.0)); BitMask(neon::vget_lane_u64(neon::vreinterpret_u64_u8(cmp), 0)) @@ -194,7 +192,7 @@ pub(crate) mod group { } #[inline] - pub(crate) unsafe fn load(ctrls: &[Tag], index: usize) -> Self { + pub unsafe fn load(ctrls: &[Tag], index: usize) -> Self { unsafe { Group(neon::vld1_u8(ctrls.as_ptr().add(index) as *const u8)) } } } @@ -443,139 +441,3 @@ impl HashIndexOps for NewHashIndex { NewHashIndex::probe_and_create(self, state, row_count, adapter) } } - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - - use super::*; - use crate::ProbeState; - - struct TestTableAdapter { - incoming: Vec<(u64, u64)>, // (key, hash) - payload: Vec<(u64, u64, u64)>, // (key, hash, value) - init_count: usize, - pin_data: Box<[u8]>, - } - - impl TestTableAdapter { - fn new(incoming: Vec<(u64, u64)>, payload: Vec<(u64, u64, u64)>) -> Self { - Self { - incoming, - init_count: payload.len(), - payload, - pin_data: vec![0; 1000].into(), - } - } - - fn init_state(&self) -> ProbeState { - let mut state = ProbeState { - row_count: self.incoming.len(), - ..Default::default() - }; - - for (i, (_, hash)) in self.incoming.iter().enumerate() { - state.group_hashes[i] = *hash - } - - state - } - - fn init_hash_index(&self, hash_index: &mut NewHashIndex) { - for (i, (_, hash, _)) in self.payload.iter().copied().enumerate() { - let slot = hash_index.probe_slot(hash); - hash_index.set_ctrl(slot, Tag::full(hash)); - hash_index.pointers[slot] = self.get_row_ptr(false, i); - } - } - - fn get_row_ptr(&self, incoming: bool, row: usize) -> RowPtr { - RowPtr::new(unsafe { - self.pin_data - .as_ptr() - .add(if incoming { row + self.init_count } else { row }) as _ - }) - } - - fn get_payload(&self, row_ptr: RowPtr) -> (u64, u64, u64) { - let index = row_ptr.as_ptr() as usize - self.pin_data.as_ptr() as usize; - self.payload[index] - } - } - - impl TableAdapter for TestTableAdapter { - fn append_rows(&mut self, state: &mut ProbeState, new_entry_count: usize) { - for row in state.empty_vector[..new_entry_count].iter() { - let (key, hash) = self.incoming[*row]; - let value = key + 20; - - self.payload.push((key, hash, value)); - state.addresses[*row] = self.get_row_ptr(true, row.to_usize()); - } - } - - fn compare( - &mut self, - state: &mut ProbeState, - need_compare_count: usize, - mut no_match_count: usize, - ) -> usize { - for row in state.group_compare_vector[..need_compare_count] - .iter() - .copied() - { - let incoming = self.incoming[row]; - let (key, _, _) = self.get_payload(state.addresses[row]); - if incoming.0 == key { - continue; - } - - state.no_match_vector[no_match_count] = row; - no_match_count += 1; - } - - no_match_count - } - } - - #[test] - fn test_new_hash_index_tag_collision_skip() { - let capacity = 16; - let hash1 = 0x7f00_0000_0000_0001; - let hash2 = 0x7f00_0000_0000_0002; - - let mut hash_index = NewHashIndex::with_capacity(capacity); - let mut adapter = TestTableAdapter::new(vec![(2, hash2)], vec![(1, hash1, 100)]); - let mut state = adapter.init_state(); - - adapter.init_hash_index(&mut hash_index); - - let count = hash_index.probe_and_create(&mut state, adapter.incoming.len(), &mut adapter); - assert_eq!(1, count); - - let got = state.addresses[..state.row_count] - .iter() - .map(|row_ptr| { - let (key, _, value) = adapter.get_payload(*row_ptr); - (key, value) - }) - .collect::>(); - - let want = HashMap::from_iter([(2, 22)]); - assert_eq!(want, got); - } - - #[test] - fn test_new_hash_index_batch_dedup() { - let capacity = 16; - let hash = 0x1234_5678_9abc_def0; - - let mut hash_index = NewHashIndex::with_capacity(capacity); - let mut adapter = TestTableAdapter::new(vec![(1, hash), (1, hash), (1, hash)], vec![]); - let mut state = adapter.init_state(); - - let count = hash_index.probe_and_create(&mut state, adapter.incoming.len(), &mut adapter); - - assert_eq!(1, count); - } -} From 603dce54306469ec03a80ec65800c8d20c646eda Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Wed, 21 Jan 2026 22:24:34 +0800 Subject: [PATCH 15/25] ok,listen to clippy --- src/query/expression/src/aggregate/new_hash_index.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/expression/src/aggregate/new_hash_index.rs b/src/query/expression/src/aggregate/new_hash_index.rs index 397ceb3abd74c..b809292198cff 100644 --- a/src/query/expression/src/aggregate/new_hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index.rs @@ -308,7 +308,7 @@ impl NewHashIndex { let tag_hash = Tag::full(hash); loop { let group = unsafe { Group::load(&self.ctrls, pos) }; - for bit in group.match_tag(tag_hash) { + if let Some(bit) = group.match_tag(tag_hash).into_iter().next() { let index = (pos + bit) & self.bucket_mask; return (index, false); } From 01e885c9a0db47e731cd6ecd1a2eadcfe124f46e Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Thu, 22 Jan 2026 10:09:08 +0800 Subject: [PATCH 16/25] reorg the files structure --- Cargo.lock | 1 + src/query/expression/Cargo.toml | 1 + src/query/expression/src/aggregate/mod.rs | 18 +- .../src/aggregate/new_hash_index/bitmask.rs | 99 +++++++++ .../src/aggregate/new_hash_index/group.rs | 108 ++++++++++ .../src/aggregate/new_hash_index/mod.rs | 22 ++ .../{ => new_hash_index}/new_hash_index.rs | 203 +----------------- 7 files changed, 243 insertions(+), 209 deletions(-) create mode 100644 src/query/expression/src/aggregate/new_hash_index/bitmask.rs create mode 100644 src/query/expression/src/aggregate/new_hash_index/group.rs create mode 100644 src/query/expression/src/aggregate/new_hash_index/mod.rs rename src/query/expression/src/aggregate/{ => new_hash_index}/new_hash_index.rs (57%) diff --git a/Cargo.lock b/Cargo.lock index b4830326582df..58b9ad06dbef1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3639,6 +3639,7 @@ dependencies = [ "base64 0.22.1", "borsh", "bumpalo", + "cfg-if", "comfy-table", "databend-common-ast", "databend-common-base", diff --git a/src/query/expression/Cargo.toml b/src/query/expression/Cargo.toml index 1abbed5f8d5c5..0bc320a26ec4b 100644 --- a/src/query/expression/Cargo.toml +++ b/src/query/expression/Cargo.toml @@ -19,6 +19,7 @@ async-backtrace = { workspace = true } base64 = { workspace = true } borsh = { workspace = true } bumpalo = { workspace = true } +cfg-if = { workspace = true } comfy-table = { workspace = true } databend-common-ast = { workspace = true } databend-common-base = { workspace = true } diff --git a/src/query/expression/src/aggregate/mod.rs b/src/query/expression/src/aggregate/mod.rs index 6087a2d2c7229..5ddb5ba26ace8 100644 --- a/src/query/expression/src/aggregate/mod.rs +++ b/src/query/expression/src/aggregate/mod.rs @@ -36,7 +36,6 @@ pub use aggregate_function::*; pub use aggregate_function_state::*; pub use aggregate_hashtable::*; pub use group_hash::*; -use hash_index::Entry; use hash_index::HashIndexOps; pub use partitioned_payload::*; pub use payload::*; @@ -45,6 +44,9 @@ pub use probe_state::ProbeState; use probe_state::*; use row_ptr::*; +use crate::aggregate::hash_index::HashIndex; +use crate::aggregate::new_hash_index::NewHashIndex; + // A batch size to probe, flush, repartition, etc. pub(crate) const BATCH_SIZE: usize = 2048; const LOAD_FACTOR: f64 = 1.5; @@ -171,28 +173,26 @@ impl HashTableConfig { fn new_hash_index(&self, capacity: usize) -> Box { if self.enable_experiment_hash_index { - Box::new(new_hash_index::NewHashIndex::with_capacity(capacity)) + Box::new(NewHashIndex::with_capacity(capacity)) } else { - Box::new(hash_index::HashIndex::with_capacity(capacity)) + Box::new(HashIndex::with_capacity(capacity)) } } fn new_dummy_hash_index(&self) -> Box { if self.enable_experiment_hash_index { - Box::new(new_hash_index::NewHashIndex::dummy()) + Box::new(NewHashIndex::dummy()) } else { - Box::new(hash_index::HashIndex::dummy()) + Box::new(HashIndex::dummy()) } } fn rebuild_hash_index(&self, capacity: usize, iter: I) -> Box where I: IntoIterator { if self.enable_experiment_hash_index { - Box::new(new_hash_index::NewHashIndex::rebuild_from_iter( - capacity, iter, - )) + Box::new(NewHashIndex::rebuild_from_iter(capacity, iter)) } else { - Box::new(hash_index::HashIndex::rebuild_from_iter(capacity, iter)) + Box::new(HashIndex::rebuild_from_iter(capacity, iter)) } } } diff --git a/src/query/expression/src/aggregate/new_hash_index/bitmask.rs b/src/query/expression/src/aggregate/new_hash_index/bitmask.rs new file mode 100644 index 0000000000000..386bc5e85fdf9 --- /dev/null +++ b/src/query/expression/src/aggregate/new_hash_index/bitmask.rs @@ -0,0 +1,99 @@ +// Copyright (c) 2016 Amanieu d'Antras +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::num::NonZeroU64; + +use crate::aggregate::new_hash_index::group::Group; + +const BITMASK_ITER_MASK: u64 = 0x8080_8080_8080_8080; + +const BITMASK_STRIDE: usize = 8; + +type NonZeroBitMaskWord = NonZeroU64; + +#[derive(Copy, Clone)] +pub(super) struct BitMask(pub(super) u64); + +impl BitMask { + #[inline] + #[must_use] + pub(super) fn remove_lowest_bit(self) -> Self { + BitMask(self.0 & (self.0 - 1)) + } + + #[inline] + pub(super) fn nonzero_trailing_zeros(nonzero: NonZeroBitMaskWord) -> usize { + if cfg!(target_arch = "arm") && BITMASK_STRIDE % 8 == 0 { + // SAFETY: A byte-swapped non-zero value is still non-zero. + let swapped = unsafe { NonZeroBitMaskWord::new_unchecked(nonzero.get().swap_bytes()) }; + swapped.leading_zeros() as usize / BITMASK_STRIDE + } else { + nonzero.trailing_zeros() as usize / BITMASK_STRIDE + } + } + + pub(super) fn lowest_set_bit(self) -> Option { + NonZeroBitMaskWord::new(self.0).map(Self::nonzero_trailing_zeros) + } +} + +impl IntoIterator for BitMask { + type Item = usize; + type IntoIter = BitMaskIter; + + #[inline] + fn into_iter(self) -> BitMaskIter { + // A BitMask only requires each element (group of bits) to be non-zero. + // However for iteration we need each element to only contain 1 bit. + BitMaskIter(BitMask(self.0 & BITMASK_ITER_MASK)) + } +} + +/// Iterator over the contents of a `BitMask`, returning the indices of set +/// bits. +#[derive(Clone)] +pub(super) struct BitMaskIter(BitMask); + +impl Iterator for BitMaskIter { + type Item = usize; + + #[inline] + fn next(&mut self) -> Option { + let bit = self.0.lowest_set_bit()?; + self.0 = self.0.remove_lowest_bit(); + Some(bit) + } +} + +/// Single tag in a control group. +#[derive(Copy, Clone, PartialEq, Eq)] +#[repr(transparent)] +pub(super) struct Tag(pub(super) u8); +impl Tag { + /// Control tag value for an empty bucket. + pub(super) const EMPTY: Tag = Tag(0b1111_1111); + + /// Creates a control tag representing a full bucket with the given hash. + #[inline] + pub(super) const fn full(hash: u64) -> Tag { + let top7 = hash >> (8 * 8 - 7); + Tag((top7 & 0x7f) as u8) // truncation + } +} +/// Helper function to replicate a tag across a `GroupWord`. +#[inline] +pub(super) fn repeat(tag: Tag) -> u64 { + u64::from_ne_bytes([tag.0; Group::WIDTH]) +} diff --git a/src/query/expression/src/aggregate/new_hash_index/group.rs b/src/query/expression/src/aggregate/new_hash_index/group.rs new file mode 100644 index 0000000000000..89a510cd41ca1 --- /dev/null +++ b/src/query/expression/src/aggregate/new_hash_index/group.rs @@ -0,0 +1,108 @@ +// Copyright (c) 2016 Amanieu d'Antras +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use cfg_if::cfg_if; + +cfg_if! { + if #[cfg(all( + target_arch = "aarch64", + target_feature = "neon", + // NEON intrinsics are currently broken on big-endian targets. + // See https://github.com/rust-lang/stdarch/issues/1484. + target_endian = "little", + not(miri), +))] { + pub(super) use self::neon::Group; + } else{ + pub(super) use self::generic::Group; + + } +} + +#[cfg(all( + target_arch = "aarch64", + target_feature = "neon", + // NEON intrinsics are currently broken on big-endian targets. + // See https://github.com/rust-lang/stdarch/issues/1484. + target_endian = "little", + not(miri), +))] +mod neon { + use core::arch::aarch64 as neon; + use std::mem; + + use crate::aggregate::new_hash_index::bitmask::BitMask; + use crate::aggregate::new_hash_index::bitmask::Tag; + + #[derive(Copy, Clone)] + pub struct Group(neon::uint8x8_t); + + impl Group { + /// Number of bytes in the group. + pub const WIDTH: usize = mem::size_of::(); + + #[inline] + pub fn match_tag(self, tag: Tag) -> BitMask { + unsafe { + let cmp = neon::vceq_u8(self.0, neon::vdup_n_u8(tag.0)); + BitMask(neon::vget_lane_u64(neon::vreinterpret_u64_u8(cmp), 0)) + } + } + + #[inline] + pub fn match_empty(self) -> BitMask { + unsafe { + let cmp = neon::vcltz_s8(neon::vreinterpret_s8_u8(self.0)); + BitMask(neon::vget_lane_u64(neon::vreinterpret_u64_u8(cmp), 0)) + } + } + + #[inline] + pub unsafe fn load(ctrls: &[Tag], index: usize) -> Self { + unsafe { Group(neon::vld1_u8(ctrls.as_ptr().add(index) as *const u8)) } + } + } +} + +mod generic { + use crate::aggregate::new_hash_index::bitmask::BitMask; + use crate::aggregate::new_hash_index::bitmask::Tag; + use crate::aggregate::new_hash_index::bitmask::repeat; + + #[derive(Copy, Clone)] + pub struct Group(u64); + + impl Group { + /// Number of bytes in the group. + pub const WIDTH: usize = 8; + + #[inline] + pub fn match_tag(self, tag: Tag) -> BitMask { + // This algorithm is derived from + // https://graphics.stanford.edu/~seander/bithacks.html##ValueInWord + let cmp = self.0 ^ repeat(tag); + BitMask((cmp.wrapping_sub(repeat(Tag(0x01))) & !cmp & repeat(Tag(0x80))).to_le()) + } + + #[inline] + pub fn match_empty(self) -> BitMask { + BitMask((self.0 & repeat(Tag(0x80))).to_le()) + } + + #[inline] + pub unsafe fn load(ctrls: &[Tag], index: usize) -> Self { + unsafe { Group((ctrls.as_ptr().add(index) as *const u64).read_unaligned()) } + } + } +} diff --git a/src/query/expression/src/aggregate/new_hash_index/mod.rs b/src/query/expression/src/aggregate/new_hash_index/mod.rs new file mode 100644 index 0000000000000..a603a94c935c4 --- /dev/null +++ b/src/query/expression/src/aggregate/new_hash_index/mod.rs @@ -0,0 +1,22 @@ +// Copyright (c) 2016 Amanieu d'Antras +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Portions of this mod are derived from the excellent +/// [hashbrown](https://github.com/rust-lang/hashbrown/tree/master) crate +mod bitmask; +mod group; +mod new_hash_index; + +pub use new_hash_index::NewHashIndex; diff --git a/src/query/expression/src/aggregate/new_hash_index.rs b/src/query/expression/src/aggregate/new_hash_index/new_hash_index.rs similarity index 57% rename from src/query/expression/src/aggregate/new_hash_index.rs rename to src/query/expression/src/aggregate/new_hash_index/new_hash_index.rs index b809292198cff..d223478e078f9 100644 --- a/src/query/expression/src/aggregate/new_hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index/new_hash_index.rs @@ -14,212 +14,15 @@ // limitations under the License. use std::hint::likely; -use std::num::NonZeroU64; -use std::ptr::NonNull; - -use databend_common_ast::parser::token::GROUP; use crate::ProbeState; -use crate::aggregate::BATCH_SIZE; +use crate::aggregate::LOAD_FACTOR; use crate::aggregate::hash_index::HashIndexOps; use crate::aggregate::hash_index::TableAdapter; +use crate::aggregate::new_hash_index::bitmask::Tag; use crate::aggregate::new_hash_index::group::Group; use crate::aggregate::row_ptr::RowPtr; -// Portions of this file are derived from excellent `hashbrown` crate - -/// Single tag in a control group. -#[derive(Copy, Clone, PartialEq, Eq)] -#[repr(transparent)] -struct Tag(u8); -impl Tag { - /// Control tag value for an empty bucket. - const EMPTY: Tag = Tag(0b1111_1111); - - /// Creates a control tag representing a full bucket with the given hash. - #[inline] - const fn full(hash: u64) -> Tag { - let top7 = hash >> (8 * 8 - 7); - Tag((top7 & 0x7f) as u8) // truncation - } -} - -const BITMASK_ITER_MASK: u64 = 0x8080_8080_8080_8080; - -const BITMASK_STRIDE: usize = 8; - -type NonZeroBitMaskWord = NonZeroU64; - -#[derive(Copy, Clone)] -struct BitMask(u64); - -impl BitMask { - #[inline] - #[must_use] - fn remove_lowest_bit(self) -> Self { - BitMask(self.0 & (self.0 - 1)) - } - - #[inline] - fn nonzero_trailing_zeros(nonzero: NonZeroBitMaskWord) -> usize { - if cfg!(target_arch = "arm") && BITMASK_STRIDE % 8 == 0 { - // SAFETY: A byte-swapped non-zero value is still non-zero. - let swapped = unsafe { NonZeroBitMaskWord::new_unchecked(nonzero.get().swap_bytes()) }; - swapped.leading_zeros() as usize / BITMASK_STRIDE - } else { - nonzero.trailing_zeros() as usize / BITMASK_STRIDE - } - } - - fn lowest_set_bit(self) -> Option { - NonZeroBitMaskWord::new(self.0).map(Self::nonzero_trailing_zeros) - } -} - -impl IntoIterator for BitMask { - type Item = usize; - type IntoIter = BitMaskIter; - - #[inline] - fn into_iter(self) -> BitMaskIter { - // A BitMask only requires each element (group of bits) to be non-zero. - // However for iteration we need each element to only contain 1 bit. - BitMaskIter(BitMask(self.0 & BITMASK_ITER_MASK)) - } -} - -/// Iterator over the contents of a `BitMask`, returning the indices of set -/// bits. -#[derive(Clone)] -struct BitMaskIter(BitMask); - -impl Iterator for BitMaskIter { - type Item = usize; - - #[inline] - fn next(&mut self) -> Option { - let bit = self.0.lowest_set_bit()?; - self.0 = self.0.remove_lowest_bit(); - Some(bit) - } -} - -/// Helper function to replicate a tag across a `GroupWord`. -#[inline] -fn repeat(tag: Tag) -> u64 { - u64::from_ne_bytes([tag.0; Group::WIDTH]) -} - -mod group { - - #[cfg(not(all( - target_arch = "aarch64", - target_feature = "neon", - target_endian = "little", - not(miri), - )))] - pub use generic::Group; - #[cfg(all( - target_arch = "aarch64", - target_feature = "neon", - // NEON intrinsics are currently broken on big-endian targets. - // See https://github.com/rust-lang/stdarch/issues/1484. - target_endian = "little", - not(miri), - ))] - pub use neon::Group; - - mod generic { - use crate::aggregate::new_hash_index::BitMask; - use crate::aggregate::new_hash_index::Tag; - use crate::aggregate::new_hash_index::repeat; - - #[derive(Copy, Clone)] - pub struct Group(u64); - - impl Group { - /// Number of bytes in the group. - pub const WIDTH: usize = 8; - - #[inline] - pub fn match_tag(self, tag: Tag) -> BitMask { - // This algorithm is derived from - // https://graphics.stanford.edu/~seander/bithacks.html##ValueInWord - let cmp = self.0 ^ repeat(tag); - BitMask((cmp.wrapping_sub(repeat(Tag(0x01))) & !cmp & repeat(Tag(0x80))).to_le()) - } - - #[inline] - pub fn match_empty(self) -> BitMask { - BitMask((self.0 & repeat(Tag(0x80))).to_le()) - } - - #[inline] - pub unsafe fn load(ctrls: &[Tag], index: usize) -> Self { - unsafe { Group((ctrls.as_ptr().add(index) as *const u64).read_unaligned()) } - } - } - } - - mod neon { - use core::arch::aarch64 as neon; - use std::mem; - - use crate::aggregate::new_hash_index::BitMask; - use crate::aggregate::new_hash_index::Tag; - - #[derive(Copy, Clone)] - pub struct Group(neon::uint8x8_t); - - impl Group { - /// Number of bytes in the group. - pub const WIDTH: usize = mem::size_of::(); - - #[inline] - pub fn match_tag(self, tag: Tag) -> BitMask { - unsafe { - let cmp = neon::vceq_u8(self.0, neon::vdup_n_u8(tag.0)); - BitMask(neon::vget_lane_u64(neon::vreinterpret_u64_u8(cmp), 0)) - } - } - - #[inline] - pub fn match_empty(self) -> BitMask { - unsafe { - let cmp = neon::vcltz_s8(neon::vreinterpret_s8_u8(self.0)); - BitMask(neon::vget_lane_u64(neon::vreinterpret_u64_u8(cmp), 0)) - } - } - - #[inline] - pub unsafe fn load(ctrls: &[Tag], index: usize) -> Self { - unsafe { Group(neon::vld1_u8(ctrls.as_ptr().add(index) as *const u8)) } - } - } - } -} - -#[derive(Clone)] -struct ProbeSeq { - pos: usize, - stride: usize, -} - -impl ProbeSeq { - #[inline] - fn move_next(&mut self, bucket_mask: usize) { - // We should have found an empty bucket by now and ended the probe. - debug_assert!( - self.stride <= bucket_mask, - "Went past end of probe sequence" - ); - - self.stride += Group::WIDTH; - self.pos += self.stride; - self.pos &= bucket_mask; - } -} - pub struct NewHashIndex { ctrls: Vec, pointers: Vec, @@ -415,7 +218,7 @@ impl HashIndexOps for NewHashIndex { } fn resize_threshold(&self) -> usize { - (self.capacity as f64 / super::LOAD_FACTOR) as usize + (self.capacity as f64 / LOAD_FACTOR) as usize } fn allocated_bytes(&self) -> usize { From ffc8e59e93efb74df6cf00be6a8f7f65ba982807 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Thu, 22 Jan 2026 13:18:00 +0800 Subject: [PATCH 17/25] performance tuning on probe_slot --- .../src/aggregate/new_hash_index/group.rs | 1 + .../new_hash_index/new_hash_index.rs | 31 +++++++++++++++++-- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/src/query/expression/src/aggregate/new_hash_index/group.rs b/src/query/expression/src/aggregate/new_hash_index/group.rs index 89a510cd41ca1..bdd25c81d89c5 100644 --- a/src/query/expression/src/aggregate/new_hash_index/group.rs +++ b/src/query/expression/src/aggregate/new_hash_index/group.rs @@ -12,6 +12,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + use cfg_if::cfg_if; cfg_if! { diff --git a/src/query/expression/src/aggregate/new_hash_index/new_hash_index.rs b/src/query/expression/src/aggregate/new_hash_index/new_hash_index.rs index d223478e078f9..dfa3db82401f4 100644 --- a/src/query/expression/src/aggregate/new_hash_index/new_hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index/new_hash_index.rs @@ -63,7 +63,7 @@ impl NewHashIndex { where I: IntoIterator { let mut hash_index = NewHashIndex::with_capacity(capacity); for (hash, row_ptr) in iter { - let slot = hash_index.probe_empty_and_set_ctrl(hash); + let slot = hash_index.probe_empty(hash); hash_index.pointers[slot] = row_ptr; hash_index.count += 1; } @@ -75,7 +75,7 @@ impl NewHashIndex { #[inline] fn ctrl(&mut self, index: usize) -> *mut Tag { debug_assert!(index < self.ctrls.len()); - unsafe { self.ctrls.as_mut_ptr().add(index) } + unsafe { self.ctrls.get_unchecked_mut(index) } } #[inline] @@ -125,7 +125,10 @@ impl NewHashIndex { } } - pub fn probe_empty_and_set_ctrl(&mut self, hash: u64) -> usize { + /// Probes the hash table for an empty slot using SIMD groups (batches) and sets the control byte. + /// + /// Returns the index of the found slot. + pub fn probe_empty_batch(&mut self, hash: u64) -> usize { let mut pos = self.h1(hash); loop { let group = unsafe { Group::load(&self.ctrls, pos) }; @@ -137,6 +140,28 @@ impl NewHashIndex { } } + /// Probes the hash table linearly (scalar probing) for an empty slot and sets the control byte. + /// Returns the absolute index of the slot. + /// + /// # Performance Note + /// This method is primarily used during resize operations. In such cases, the map is very + /// sparse, meaning collisions are rare. + /// + /// While SIMD probing (`probe_empty_batch`) is efficient for skipping full groups, it has + /// overhead. When the map is sparse, we expect to find an empty slot almost immediately + /// (often the first probe). In this specific situation, a simple scalar probe is faster + pub fn probe_empty(&mut self, hash: u64) -> usize { + let mut pos = self.h1(hash); + loop { + let ctrl = unsafe { *self.ctrl(pos) }; + if ctrl == Tag::EMPTY { + self.set_ctrl(pos, Tag::full(hash)); + return pos; + } + pos = (pos + 1) & self.bucket_mask; + } + } + pub fn probe_and_create( &mut self, state: &mut ProbeState, From b59dcaa49f0071f834ac22e71486dbbdc8155504 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Thu, 22 Jan 2026 13:37:50 +0800 Subject: [PATCH 18/25] clippy --- .../new_hash_index/{new_hash_index.rs => hash_index.rs} | 0 src/query/expression/src/aggregate/new_hash_index/mod.rs | 4 ++-- 2 files changed, 2 insertions(+), 2 deletions(-) rename src/query/expression/src/aggregate/new_hash_index/{new_hash_index.rs => hash_index.rs} (100%) diff --git a/src/query/expression/src/aggregate/new_hash_index/new_hash_index.rs b/src/query/expression/src/aggregate/new_hash_index/hash_index.rs similarity index 100% rename from src/query/expression/src/aggregate/new_hash_index/new_hash_index.rs rename to src/query/expression/src/aggregate/new_hash_index/hash_index.rs diff --git a/src/query/expression/src/aggregate/new_hash_index/mod.rs b/src/query/expression/src/aggregate/new_hash_index/mod.rs index a603a94c935c4..5dd96521c7e56 100644 --- a/src/query/expression/src/aggregate/new_hash_index/mod.rs +++ b/src/query/expression/src/aggregate/new_hash_index/mod.rs @@ -17,6 +17,6 @@ /// [hashbrown](https://github.com/rust-lang/hashbrown/tree/master) crate mod bitmask; mod group; -mod new_hash_index; +mod hash_index; -pub use new_hash_index::NewHashIndex; +pub use hash_index::NewHashIndex; From eb4e1e7725103c675b83b176f2810ffea309118b Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Thu, 22 Jan 2026 20:44:22 +0800 Subject: [PATCH 19/25] perf: quad resize when capacity is small --- .../src/aggregate/aggregate_hashtable.rs | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/src/query/expression/src/aggregate/aggregate_hashtable.rs b/src/query/expression/src/aggregate/aggregate_hashtable.rs index 23bcf9d234f62..b7cedb7916e3b 100644 --- a/src/query/expression/src/aggregate/aggregate_hashtable.rs +++ b/src/query/expression/src/aggregate/aggregate_hashtable.rs @@ -37,6 +37,8 @@ use crate::ColumnBuilder; use crate::ProjectedBlock; use crate::types::DataType; +const SMALL_CAPACITY_RESIZE_COUNT: usize = 4; + pub struct AggregateHashTable { pub payload: PartitionedPayload, // use for append rows directly during deserialize @@ -253,7 +255,8 @@ impl AggregateHashTable { ) -> usize { // exceed capacity or should resize if row_count + self.hash_index.count() > self.hash_index.resize_threshold() { - self.resize(self.hash_index.capacity() * 2); + let new_capacity = self.next_resize_capacity(); + self.resize(new_capacity); } let mut adapter = AdapterImpl { @@ -264,6 +267,26 @@ impl AggregateHashTable { .probe_and_create(state, row_count, &mut adapter) } + fn next_resize_capacity(&self) -> usize { + // Use *4 for the first few resizes, then switch back to *2. + // SMALL_CAPACITY_RESIZE_COUNT = 4: + // + // | Quad resizes used | Equivalent double-resize steps | + // | 0 | 0 | + // | 1 | 2 | + // | 2 | 4 | + // | 3 | 6 | + // | 4 | 8 | + // | 5 | 9 | + // | 6 | 10 | + let current = self.hash_index.capacity(); + if self.hash_index_resize_count < SMALL_CAPACITY_RESIZE_COUNT { + current * 4 + } else { + current * 2 + } + } + pub fn combine(&mut self, other: Self, flush_state: &mut PayloadFlushState) -> Result<()> { self.combine_payloads(&other.payload, flush_state) } @@ -395,11 +418,12 @@ impl AggregateHashTable { // scan payload to reconstruct PointArray fn resize(&mut self, new_capacity: usize) { if self.config.partial_agg { - if self.hash_index.capacity() == self.config.max_partial_capacity { + let target = new_capacity.min(self.config.max_partial_capacity); + if target == self.hash_index.capacity() { return; } self.hash_index_resize_count += 1; - self.hash_index = self.config.new_hash_index(new_capacity); + self.hash_index = self.config.new_hash_index(target); return; } From 0f7745c6673c8882b16514c61edfba6c5d1c228d Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Thu, 22 Jan 2026 20:55:59 +0800 Subject: [PATCH 20/25] optimistic try with the hash, then in a batch perf: optimistic try with the hash, then in a batch --- .../aggregate/new_hash_index/hash_index.rs | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/src/query/expression/src/aggregate/new_hash_index/hash_index.rs b/src/query/expression/src/aggregate/new_hash_index/hash_index.rs index dfa3db82401f4..cd42298fb0d5b 100644 --- a/src/query/expression/src/aggregate/new_hash_index/hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index/hash_index.rs @@ -69,9 +69,7 @@ impl NewHashIndex { } hash_index } -} -impl NewHashIndex { #[inline] fn ctrl(&mut self, index: usize) -> *mut Tag { debug_assert!(index < self.ctrls.len()); @@ -105,10 +103,11 @@ impl NewHashIndex { None } } +} - pub fn find_or_insert(&mut self, mut pos: usize, hash: u64) -> (usize, bool) { +impl NewHashIndex { + fn find_or_insert_batch(&mut self, mut pos: usize, tag_hash: Tag) -> (usize, bool) { let mut insert_index = None; - let tag_hash = Tag::full(hash); loop { let group = unsafe { Group::load(&self.ctrls, pos) }; if let Some(bit) = group.match_tag(tag_hash).into_iter().next() { @@ -125,6 +124,19 @@ impl NewHashIndex { } } + pub fn find_or_insert(&mut self, pos: usize, hash: u64) -> (usize, bool) { + let tag_hash = Tag::full(hash); + let ctrl = unsafe { *self.ctrl(pos) }; + if ctrl == tag_hash { + return (pos, false); + } + if ctrl == Tag::EMPTY { + self.set_ctrl(pos, tag_hash); + return (pos, true); + } + self.find_or_insert_batch(pos, tag_hash) + } + /// Probes the hash table for an empty slot using SIMD groups (batches) and sets the control byte. /// /// Returns the index of the found slot. @@ -269,3 +281,24 @@ impl HashIndexOps for NewHashIndex { NewHashIndex::probe_and_create(self, state, row_count, adapter) } } + +#[cfg(test)] +mod tests { + use super::NewHashIndex; + + #[test] + fn find_or_insert_inserts_on_empty_slot() { + let capacity = 8; + let mut index = NewHashIndex::with_capacity(capacity); + let hash = 0x1234_5678_9abc_def0_u64; + let pos = (hash as usize) & (capacity - 1); + + let (slot, is_new) = index.find_or_insert(pos, hash); + assert_eq!(slot, pos); + assert!(is_new); + + let (slot, is_new) = index.find_or_insert(pos, hash); + assert_eq!(slot, pos); + assert!(!is_new); + } +} From 4fb671e9494d7e7a86c3a43d4a3d10d46e30ea23 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Fri, 23 Jan 2026 14:05:27 +0800 Subject: [PATCH 21/25] Revert "optimistic try with the hash, then in a batch" This reverts commit 0f7745c6673c8882b16514c61edfba6c5d1c228d. --- .../aggregate/new_hash_index/hash_index.rs | 41 ++----------------- 1 file changed, 4 insertions(+), 37 deletions(-) diff --git a/src/query/expression/src/aggregate/new_hash_index/hash_index.rs b/src/query/expression/src/aggregate/new_hash_index/hash_index.rs index cd42298fb0d5b..dfa3db82401f4 100644 --- a/src/query/expression/src/aggregate/new_hash_index/hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index/hash_index.rs @@ -69,7 +69,9 @@ impl NewHashIndex { } hash_index } +} +impl NewHashIndex { #[inline] fn ctrl(&mut self, index: usize) -> *mut Tag { debug_assert!(index < self.ctrls.len()); @@ -103,11 +105,10 @@ impl NewHashIndex { None } } -} -impl NewHashIndex { - fn find_or_insert_batch(&mut self, mut pos: usize, tag_hash: Tag) -> (usize, bool) { + pub fn find_or_insert(&mut self, mut pos: usize, hash: u64) -> (usize, bool) { let mut insert_index = None; + let tag_hash = Tag::full(hash); loop { let group = unsafe { Group::load(&self.ctrls, pos) }; if let Some(bit) = group.match_tag(tag_hash).into_iter().next() { @@ -124,19 +125,6 @@ impl NewHashIndex { } } - pub fn find_or_insert(&mut self, pos: usize, hash: u64) -> (usize, bool) { - let tag_hash = Tag::full(hash); - let ctrl = unsafe { *self.ctrl(pos) }; - if ctrl == tag_hash { - return (pos, false); - } - if ctrl == Tag::EMPTY { - self.set_ctrl(pos, tag_hash); - return (pos, true); - } - self.find_or_insert_batch(pos, tag_hash) - } - /// Probes the hash table for an empty slot using SIMD groups (batches) and sets the control byte. /// /// Returns the index of the found slot. @@ -281,24 +269,3 @@ impl HashIndexOps for NewHashIndex { NewHashIndex::probe_and_create(self, state, row_count, adapter) } } - -#[cfg(test)] -mod tests { - use super::NewHashIndex; - - #[test] - fn find_or_insert_inserts_on_empty_slot() { - let capacity = 8; - let mut index = NewHashIndex::with_capacity(capacity); - let hash = 0x1234_5678_9abc_def0_u64; - let pos = (hash as usize) & (capacity - 1); - - let (slot, is_new) = index.find_or_insert(pos, hash); - assert_eq!(slot, pos); - assert!(is_new); - - let (slot, is_new) = index.find_or_insert(pos, hash); - assert_eq!(slot, pos); - assert!(!is_new); - } -} From fb2707d799ff6c6b15014fccf3e094fd3b1c3cca Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Fri, 23 Jan 2026 14:57:41 +0800 Subject: [PATCH 22/25] fixup --- .../src/aggregate/aggregate_hashtable.rs | 19 ++++---- .../expression/src/aggregate/hash_index.rs | 9 ++-- src/query/expression/src/aggregate/mod.rs | 13 ++++-- .../src/aggregate/new_hash_index/bitmask.rs | 4 ++ .../aggregate/new_hash_index/hash_index.rs | 46 +++++++++++++++---- 5 files changed, 64 insertions(+), 27 deletions(-) diff --git a/src/query/expression/src/aggregate/aggregate_hashtable.rs b/src/query/expression/src/aggregate/aggregate_hashtable.rs index b7cedb7916e3b..bdc833c0a1f18 100644 --- a/src/query/expression/src/aggregate/aggregate_hashtable.rs +++ b/src/query/expression/src/aggregate/aggregate_hashtable.rs @@ -14,28 +14,29 @@ // A new AggregateHashtable which inspired by duckdb's https://duckdb.org/2022/03/07/aggregate-hashtable.html -use std::sync::Arc; use std::sync::atomic::Ordering; +use std::sync::Arc; use bumpalo::Bump; use databend_common_exception::Result; -use super::BATCH_SIZE; -use super::HashTableConfig; -use super::LOAD_FACTOR; -use super::MAX_PAGE_SIZE; -use super::Payload; use super::group_hash_entries; use super::hash_index::AdapterImpl; use super::hash_index::HashIndexOps; use super::partitioned_payload::PartitionedPayload; use super::payload_flush::PayloadFlushState; use super::probe_state::ProbeState; +use super::HashTableConfig; +use super::Payload; +use super::BATCH_SIZE; +use super::LOAD_FACTOR; +use super::MAX_PAGE_SIZE; +use crate::aggregate::row_ptr::RowPtr; +use crate::types::DataType; use crate::AggregateFunctionRef; use crate::BlockEntry; use crate::ColumnBuilder; use crate::ProjectedBlock; -use crate::types::DataType; const SMALL_CAPACITY_RESIZE_COUNT: usize = 4; @@ -440,7 +441,9 @@ impl AggregateHashTable { }) }); - self.hash_index = self.config.rebuild_hash_index(new_capacity, iter); + self.hash_index = self + .config + .rebuild_hash_index(new_capacity, iter.collect::>()); } fn initial_capacity() -> usize { diff --git a/src/query/expression/src/aggregate/hash_index.rs b/src/query/expression/src/aggregate/hash_index.rs index 0959d693ec59e..bda3133187634 100644 --- a/src/query/expression/src/aggregate/hash_index.rs +++ b/src/query/expression/src/aggregate/hash_index.rs @@ -14,11 +14,11 @@ use std::fmt::Debug; -use super::LOAD_FACTOR; +use super::payload_row::CompareState; use super::PartitionedPayload; use super::ProbeState; use super::RowPtr; -use super::payload_row::CompareState; +use super::LOAD_FACTOR; use crate::ProjectedBlock; pub(super) struct HashIndex { @@ -75,11 +75,10 @@ impl HashIndex { } } - pub(super) fn rebuild_from_iter(capacity: usize, iter: I) -> Self - where I: IntoIterator { + pub(super) fn rebuild_from(capacity: usize, data: Vec<(u64, RowPtr)>) -> Self { let mut hash_index = HashIndex::with_capacity(capacity); - for (hash, row_ptr) in iter { + for (hash, row_ptr) in data { let slot = hash_index.probe_slot(hash); let entry = hash_index.mut_entry(slot); diff --git a/src/query/expression/src/aggregate/mod.rs b/src/query/expression/src/aggregate/mod.rs index 5ddb5ba26ace8..77444eda4a02f 100644 --- a/src/query/expression/src/aggregate/mod.rs +++ b/src/query/expression/src/aggregate/mod.rs @@ -28,9 +28,9 @@ mod payload_row; mod probe_state; mod row_ptr; -use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; +use std::sync::Arc; pub use aggregate_function::*; pub use aggregate_function_state::*; @@ -187,12 +187,15 @@ impl HashTableConfig { } } - fn rebuild_hash_index(&self, capacity: usize, iter: I) -> Box - where I: IntoIterator { + fn rebuild_hash_index( + &self, + capacity: usize, + data: Vec<(u64, RowPtr)>, + ) -> Box { if self.enable_experiment_hash_index { - Box::new(NewHashIndex::rebuild_from_iter(capacity, iter)) + Box::new(NewHashIndex::rebuild_from(capacity, data)) } else { - Box::new(HashIndex::rebuild_from_iter(capacity, iter)) + Box::new(HashIndex::rebuild_from(capacity, data)) } } } diff --git a/src/query/expression/src/aggregate/new_hash_index/bitmask.rs b/src/query/expression/src/aggregate/new_hash_index/bitmask.rs index 386bc5e85fdf9..8ae04c3333d3f 100644 --- a/src/query/expression/src/aggregate/new_hash_index/bitmask.rs +++ b/src/query/expression/src/aggregate/new_hash_index/bitmask.rs @@ -91,6 +91,10 @@ impl Tag { let top7 = hash >> (8 * 8 - 7); Tag((top7 & 0x7f) as u8) // truncation } + + pub(super) fn is_empty(&self) -> bool { + *self == Tag::EMPTY + } } /// Helper function to replicate a tag across a `GroupWord`. #[inline] diff --git a/src/query/expression/src/aggregate/new_hash_index/hash_index.rs b/src/query/expression/src/aggregate/new_hash_index/hash_index.rs index dfa3db82401f4..7d9768e7be000 100644 --- a/src/query/expression/src/aggregate/new_hash_index/hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index/hash_index.rs @@ -15,13 +15,13 @@ use std::hint::likely; -use crate::ProbeState; -use crate::aggregate::LOAD_FACTOR; use crate::aggregate::hash_index::HashIndexOps; use crate::aggregate::hash_index::TableAdapter; use crate::aggregate::new_hash_index::bitmask::Tag; use crate::aggregate::new_hash_index::group::Group; use crate::aggregate::row_ptr::RowPtr; +use crate::aggregate::LOAD_FACTOR; +use crate::ProbeState; pub struct NewHashIndex { ctrls: Vec, @@ -59,14 +59,41 @@ impl NewHashIndex { } } - pub fn rebuild_from_iter(capacity: usize, iter: I) -> Self - where I: IntoIterator { + const BATCH_PROBING_THRESHOLD: usize = 2_097_152; + + pub fn rebuild_from(capacity: usize, data: Vec<(u64, RowPtr)>) -> Self { let mut hash_index = NewHashIndex::with_capacity(capacity); - for (hash, row_ptr) in iter { - let slot = hash_index.probe_empty(hash); - hash_index.pointers[slot] = row_ptr; + + let len = data.len(); + let split_idx = if len <= Self::BATCH_PROBING_THRESHOLD { + len + } else { + len * 4 / 5 + }; + + let (part1, part2) = data.split_at(split_idx); + + for (hash, row_ptr) in part1 { + // scalar probing because the table is sparse + let slot = hash_index.probe_empty(*hash); + + // SAFETY: slot is guaranteed to be valid and empty + unsafe { + *hash_index.pointers.get_unchecked_mut(slot) = *row_ptr; + } hash_index.count += 1; } + + for (hash, row_ptr) in part2 { + let slot = hash_index.probe_empty_batch(*hash); + + // SAFETY: slot is guaranteed to be valid and empty + unsafe { + *hash_index.pointers.get_unchecked_mut(slot) = *row_ptr; + } + hash_index.count += 1; + } + hash_index } } @@ -151,11 +178,12 @@ impl NewHashIndex { /// overhead. When the map is sparse, we expect to find an empty slot almost immediately /// (often the first probe). In this specific situation, a simple scalar probe is faster pub fn probe_empty(&mut self, hash: u64) -> usize { + let tag_hash = Tag::full(hash); let mut pos = self.h1(hash); loop { let ctrl = unsafe { *self.ctrl(pos) }; - if ctrl == Tag::EMPTY { - self.set_ctrl(pos, Tag::full(hash)); + if likely(ctrl.is_empty()) { + self.set_ctrl(pos, tag_hash); return pos; } pos = (pos + 1) & self.bucket_mask; From f6c37af53c210b334b240812c2025f569a8fd92a Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Fri, 23 Jan 2026 14:58:03 +0800 Subject: [PATCH 23/25] fixup --- .../src/aggregate/aggregate_hashtable.rs | 16 ++++++++-------- src/query/expression/src/aggregate/hash_index.rs | 4 ++-- src/query/expression/src/aggregate/mod.rs | 2 +- .../src/aggregate/new_hash_index/hash_index.rs | 4 ++-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/query/expression/src/aggregate/aggregate_hashtable.rs b/src/query/expression/src/aggregate/aggregate_hashtable.rs index bdc833c0a1f18..7a94d71dfd58a 100644 --- a/src/query/expression/src/aggregate/aggregate_hashtable.rs +++ b/src/query/expression/src/aggregate/aggregate_hashtable.rs @@ -14,29 +14,29 @@ // A new AggregateHashtable which inspired by duckdb's https://duckdb.org/2022/03/07/aggregate-hashtable.html -use std::sync::atomic::Ordering; use std::sync::Arc; +use std::sync::atomic::Ordering; use bumpalo::Bump; use databend_common_exception::Result; +use super::BATCH_SIZE; +use super::HashTableConfig; +use super::LOAD_FACTOR; +use super::MAX_PAGE_SIZE; +use super::Payload; use super::group_hash_entries; use super::hash_index::AdapterImpl; use super::hash_index::HashIndexOps; use super::partitioned_payload::PartitionedPayload; use super::payload_flush::PayloadFlushState; use super::probe_state::ProbeState; -use super::HashTableConfig; -use super::Payload; -use super::BATCH_SIZE; -use super::LOAD_FACTOR; -use super::MAX_PAGE_SIZE; -use crate::aggregate::row_ptr::RowPtr; -use crate::types::DataType; use crate::AggregateFunctionRef; use crate::BlockEntry; use crate::ColumnBuilder; use crate::ProjectedBlock; +use crate::aggregate::row_ptr::RowPtr; +use crate::types::DataType; const SMALL_CAPACITY_RESIZE_COUNT: usize = 4; diff --git a/src/query/expression/src/aggregate/hash_index.rs b/src/query/expression/src/aggregate/hash_index.rs index bda3133187634..15b4e3dfd5351 100644 --- a/src/query/expression/src/aggregate/hash_index.rs +++ b/src/query/expression/src/aggregate/hash_index.rs @@ -14,11 +14,11 @@ use std::fmt::Debug; -use super::payload_row::CompareState; +use super::LOAD_FACTOR; use super::PartitionedPayload; use super::ProbeState; use super::RowPtr; -use super::LOAD_FACTOR; +use super::payload_row::CompareState; use crate::ProjectedBlock; pub(super) struct HashIndex { diff --git a/src/query/expression/src/aggregate/mod.rs b/src/query/expression/src/aggregate/mod.rs index 77444eda4a02f..1c19f8244efb3 100644 --- a/src/query/expression/src/aggregate/mod.rs +++ b/src/query/expression/src/aggregate/mod.rs @@ -28,9 +28,9 @@ mod payload_row; mod probe_state; mod row_ptr; +use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; -use std::sync::Arc; pub use aggregate_function::*; pub use aggregate_function_state::*; diff --git a/src/query/expression/src/aggregate/new_hash_index/hash_index.rs b/src/query/expression/src/aggregate/new_hash_index/hash_index.rs index 7d9768e7be000..feb574cb6528c 100644 --- a/src/query/expression/src/aggregate/new_hash_index/hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index/hash_index.rs @@ -15,13 +15,13 @@ use std::hint::likely; +use crate::ProbeState; +use crate::aggregate::LOAD_FACTOR; use crate::aggregate::hash_index::HashIndexOps; use crate::aggregate::hash_index::TableAdapter; use crate::aggregate::new_hash_index::bitmask::Tag; use crate::aggregate::new_hash_index::group::Group; use crate::aggregate::row_ptr::RowPtr; -use crate::aggregate::LOAD_FACTOR; -use crate::ProbeState; pub struct NewHashIndex { ctrls: Vec, From aa8aeccf548f794897d74813152a32271636aa26 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Fri, 23 Jan 2026 16:40:07 +0800 Subject: [PATCH 24/25] revert --- .../src/aggregate/aggregate_hashtable.rs | 31 +++++++++---------- .../expression/src/aggregate/hash_index.rs | 9 ++++++ src/query/expression/src/aggregate/mod.rs | 12 ------- .../aggregate/new_hash_index/hash_index.rs | 7 +++++ 4 files changed, 31 insertions(+), 28 deletions(-) diff --git a/src/query/expression/src/aggregate/aggregate_hashtable.rs b/src/query/expression/src/aggregate/aggregate_hashtable.rs index 7a94d71dfd58a..ef018cf939bd7 100644 --- a/src/query/expression/src/aggregate/aggregate_hashtable.rs +++ b/src/query/expression/src/aggregate/aggregate_hashtable.rs @@ -35,7 +35,6 @@ use crate::AggregateFunctionRef; use crate::BlockEntry; use crate::ColumnBuilder; use crate::ProjectedBlock; -use crate::aggregate::row_ptr::RowPtr; use crate::types::DataType; const SMALL_CAPACITY_RESIZE_COUNT: usize = 4; @@ -429,21 +428,21 @@ impl AggregateHashTable { } self.hash_index_resize_count += 1; - let iter = self.payload.payloads.iter().flat_map(|payload| { - let row_layout = &payload.row_layout; - let tuple_size = payload.tuple_size; - payload.pages.iter().flat_map(move |page| { - (0..page.rows).map(move |idx| { - let row_ptr = page.data_ptr(idx, tuple_size); - let hash = row_ptr.hash(row_layout); - (hash, row_ptr) - }) - }) - }); - - self.hash_index = self - .config - .rebuild_hash_index(new_capacity, iter.collect::>()); + + let mut hash_index = self.config.new_hash_index(new_capacity); + // iterate over payloads and copy to new entries + for payload in self.payload.payloads.iter() { + for page in payload.pages.iter() { + for idx in 0..page.rows { + let row_ptr = page.data_ptr(idx, payload.tuple_size); + let hash = row_ptr.hash(&payload.row_layout); + + hash_index.probe_slot_and_set(hash, row_ptr); + } + } + } + + self.hash_index = hash_index } fn initial_capacity() -> usize { diff --git a/src/query/expression/src/aggregate/hash_index.rs b/src/query/expression/src/aggregate/hash_index.rs index 15b4e3dfd5351..ee6ede60effc6 100644 --- a/src/query/expression/src/aggregate/hash_index.rs +++ b/src/query/expression/src/aggregate/hash_index.rs @@ -221,6 +221,7 @@ pub(super) trait HashIndexOps { row_count: usize, adapter: &mut dyn TableAdapter, ) -> usize; + fn probe_slot_and_set(&mut self, hash: u64, row_ptr: RowPtr); } impl HashIndex { @@ -332,6 +333,14 @@ impl HashIndexOps for HashIndex { ) -> usize { HashIndex::probe_and_create(self, state, row_count, adapter) } + + fn probe_slot_and_set(&mut self, hash: u64, row_ptr: RowPtr) { + let slot = HashIndex::probe_slot(self, hash); + let mut entry = self.mut_entry(slot); + entry.set_hash(hash); + entry.set_pointer(row_ptr); + self.count += 1; + } } pub(super) struct AdapterImpl<'a> { diff --git a/src/query/expression/src/aggregate/mod.rs b/src/query/expression/src/aggregate/mod.rs index 1c19f8244efb3..8d31015ff689a 100644 --- a/src/query/expression/src/aggregate/mod.rs +++ b/src/query/expression/src/aggregate/mod.rs @@ -186,16 +186,4 @@ impl HashTableConfig { Box::new(HashIndex::dummy()) } } - - fn rebuild_hash_index( - &self, - capacity: usize, - data: Vec<(u64, RowPtr)>, - ) -> Box { - if self.enable_experiment_hash_index { - Box::new(NewHashIndex::rebuild_from(capacity, data)) - } else { - Box::new(HashIndex::rebuild_from(capacity, data)) - } - } } diff --git a/src/query/expression/src/aggregate/new_hash_index/hash_index.rs b/src/query/expression/src/aggregate/new_hash_index/hash_index.rs index feb574cb6528c..c67448b390c1a 100644 --- a/src/query/expression/src/aggregate/new_hash_index/hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index/hash_index.rs @@ -296,4 +296,11 @@ impl HashIndexOps for NewHashIndex { ) -> usize { NewHashIndex::probe_and_create(self, state, row_count, adapter) } + + fn probe_slot_and_set(&mut self, hash: u64, row_ptr: RowPtr) { + let index = self.probe_empty(hash); + unsafe { + *self.pointers.get_unchecked_mut(index) = row_ptr; + } + } } From f87a71aaf55ede6f8304db9c14d06661f31a1c32 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Fri, 23 Jan 2026 16:48:23 +0800 Subject: [PATCH 25/25] revert --- src/query/expression/src/aggregate/new_hash_index/hash_index.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/query/expression/src/aggregate/new_hash_index/hash_index.rs b/src/query/expression/src/aggregate/new_hash_index/hash_index.rs index c67448b390c1a..a3acf3e8de3cd 100644 --- a/src/query/expression/src/aggregate/new_hash_index/hash_index.rs +++ b/src/query/expression/src/aggregate/new_hash_index/hash_index.rs @@ -302,5 +302,6 @@ impl HashIndexOps for NewHashIndex { unsafe { *self.pointers.get_unchecked_mut(index) = row_ptr; } + self.count += 1; } }