-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Refactor InListExpr to support structs by re-using existing hashing infrastructure #18449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| // TODO: serialize the inner ArrayRef directly to avoid materialization into literals | ||
| // by extending the protobuf definition to support both representations and adding a public | ||
| // accessor method to InListExpr to get the inner ArrayRef |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll create a followup issue once we merge this
| 05)--------ProjectionExec: expr=[] | ||
| 06)----------CoalesceBatchesExec: target_batch_size=8192 | ||
| 07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]) | ||
| 07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because we now support Utf8View for building the sets 😄
| let random_state = RandomState::with_seed(0); | ||
| let mut hashes_buf = vec![0u64; array.len()]; | ||
| let Ok(hashes_buf) = create_hashes_from_arrays( | ||
| &[array.as_ref()], | ||
| &random_state, | ||
| &mut hashes_buf, | ||
| ) else { | ||
| unreachable!("Failed to create hashes for InList array. This shouldn't happen because make_set should have succeeded earlier."); | ||
| }; | ||
| hashes_buf.hash(state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could pre-compute and store a hash: u64 which would be both more performant when Hash is called and avoid this error, but it would add more complexity and some overhead when building the InListExpr
4d4b797 to
9a0f6be
Compare
9a0f6be to
f1f3b66
Compare
## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in apache#17171. A "target state" is tracked in apache#18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - (This PR): apache#18448 - apache#18449 (depends on apache#18448) - apache#18451 ## Changes in this PR Change create_hashes and related functions to work with &dyn Array references instead of requiring ArrayRef (Arc-wrapped arrays). This avoids unnecessary Arc::clone() calls and enables calls that only have an &dyn Array to use the hashing utilities. - Add create_hashes_from_arrays(&[&dyn Array]) function - Refactor hash_dictionary, hash_list_array, hash_fixed_list_array to use references instead of cloning - Extract hash_single_array() helper for common logic --------- Co-authored-by: Andrew Lamb <[email protected]>
f1f3b66 to
d1b9d05
Compare
| /// supported. Returns None otherwise. See [`LiteralGuarantee::analyze`] to | ||
| /// create these structures from an predicate (boolean expression). | ||
| fn new<'a>( | ||
| fn new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's worth discussing in this review how far we propagate the changes.
In particular, InListExpr will now have two operations modes:
- Was built with an
ArrayRefor was able to build anArrayReffrom a homogeneously typedVec<Arc<dyn PhysicalExpr>>which are all literals. - Was built with a
Vec<Arc<dyn PhysicalExpr>>which are not literals or homogeneously typed.
If we restrict LiteralGuarantee to only operate on the first cases, I think we could lift out a lot of computation: instead of transforming ArrayRef -> Vec<Arc<dyn PhysicalExpr>> -> Vec<ScalarValue> -> HashSet<ScalarValue> which then gets fed into bloom filters which are per-column and don't really support heterogenous ScalarValues we could re-use the already deduplicated ArraySet that InListExpr has internally or something. The ultimate thing to do, but that would require even more work and changes, would be to make PruningPredicate::contains accept an enum ArrayOrScalars { Array(ArrayRef), Scalars(Vec<ScalarValue>) } so that we can push down and iterate over the Arc'ed ArrayRef the whole way down. I think we could make this backwards compatible.
I think that change is worth it, but it requires a bit more coordination (with arrow-rs) and a bigger change.
The end result would be that:
- When you create an
InListExproperates in mode (1) we are able to push down into bloom filters with no data copies at all. - When the
InListExproperates in mode (2) we'd bail on the pushdown early (e.g.list() -> Option<ArrayRef>) and avoid building theHashSet<ScalarValue>, etc. that won't be used.
Wdyt @alamb ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I've looked into this and it is entirely possible, I think we should do it.
Basically the status quo currently is that we always try to build an ArrayHashSet which is only possible if we can convert the Vec<ScalarValue> into an ArrayRef.
At that point the only reason to store the Vec<SclarValue> is to later pass it into PruningPredicate -> bloom filters and LiteralGuarantee. If we can refactor those two to also handle an ArrayRef we could probably avoid a lot of cloning and make things more efficient by using arrays. I don't even think we need to support Vec<ScalarValue> at all: the only reason to have that is if you could not build a homogeneously typed array, and if that is the case you probably can't do any sort of pushdown into a bloom filter. E.g. select variant_get(col, 'abc') in (1, 2.0, 'c') might make sense and work but I don't think we could ever push that down into a bloom filter. So InListExpr needs to operate on both but I don't think the pruning machinery does.
So anyway I think I may try to reduce this change to only be about using create_hashes and ignore any inefficiencies as a TODO for a followup issue. At the end of the day if we can make HashJoinExec faster even if that's with some inefficiencies I think that's okay and we can improve more later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll record a preview of some of the changes I had made to explore this (by no means ready) just for future reference: https://github.com/pydantic/datafusion/compare/refactor-in-list...pydantic:datafusion:use-array-in-pruning?expand=1
| pub trait Set: Send + Sync { | ||
| fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray>; | ||
| fn has_nulls(&self) -> bool; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We get rid of the Set trait. The only implementer was ArraySet
| array => Arc::new(ArraySet::new(array, make_hash_set(array))), | ||
| DataType::Boolean => { | ||
| let array = as_boolean_array(array)?; | ||
| Arc::new(ArraySet::new(array, make_hash_set(array))) | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We get rid of this type matching logic
| trait IsEqual: HashValue { | ||
| fn is_equal(&self, other: &Self) -> bool; | ||
| } | ||
|
|
||
| impl<T: IsEqual + ?Sized> IsEqual for &T { | ||
| fn is_equal(&self, other: &Self) -> bool { | ||
| T::is_equal(self, other) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We get rid of these custom equality / hash traits
Changes:
- Enhance InListExpr to efficiently store homogeneous lists as arrays and avoid a conversion to Vec<PhysicalExpr>
by adding an internal InListStorage enum with Array and Exprs variants
- Re-use existing hashing and comparison utilities to support Struct arrays and other complex types
- Add public function `in_list_from_array(expr, list_array, negated)` for creating InList from arrays
|
@alamb could you run the benchmark again? Also tpch would be great since the plans did change (I think for the better). I'm getting promising results locally: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A PR to port this to arrow itself: apache/arrow-rs#8814
| /// Maximum size for the thread-local hash buffer before truncation (4MB = 524,288 u64 elements). | ||
| /// The goal of this is to avoid unbounded memory growth that would appear as a memory leak. | ||
| /// We allow temporary allocations beyond this size, but after use the buffer is truncated | ||
| /// to this size. | ||
| const MAX_BUFFER_SIZE: usize = 524_288; | ||
|
|
||
| thread_local! { | ||
| /// Thread-local buffer for hash computations to avoid repeated allocations. | ||
| /// The buffer is reused across calls and truncated if it exceeds MAX_BUFFER_SIZE. | ||
| /// Defaults to a capacity of 8192 u64 elements which is the default batch size. | ||
| /// This corresponds to 64KB of memory. | ||
| static HASH_BUFFER: RefCell<Vec<u64>> = RefCell::new(Vec::with_capacity(8192)); | ||
| } | ||
|
|
||
| /// Creates hashes for the given arrays using a thread-local buffer, then calls the provided callback | ||
| /// with an immutable reference to the computed hashes. | ||
| /// | ||
| /// This function manages a thread-local buffer to avoid repeated allocations. The buffer is automatically | ||
| /// truncated if it exceeds [`MAX_BUFFER_SIZE`] after use. | ||
| /// | ||
| /// # Arguments | ||
| /// * `arrays` - The arrays to hash (must contain at least one array) | ||
| /// * `random_state` - The random state for hashing | ||
| /// * `callback` - A function that receives an immutable reference to the hash slice and returns a result | ||
| /// | ||
| /// # Errors | ||
| /// Returns an error if: | ||
| /// - No arrays are provided | ||
| /// - The function is called reentrantly (i.e., the callback invokes `with_hashes` again on the same thread) | ||
| /// - The function is called during or after thread destruction | ||
| /// | ||
| /// # Example | ||
| /// ```ignore | ||
| /// use datafusion_common::hash_utils::{with_hashes, RandomState}; | ||
| /// use arrow::array::{Int32Array, ArrayRef}; | ||
| /// use std::sync::Arc; | ||
| /// | ||
| /// let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); | ||
| /// let random_state = RandomState::new(); | ||
| /// | ||
| /// let result = with_hashes([&array], &random_state, |hashes| { | ||
| /// // Use the hashes here | ||
| /// Ok(hashes.len()) | ||
| /// })?; | ||
| /// ``` | ||
| pub fn with_hashes<I, T, F, R>( | ||
| arrays: I, | ||
| random_state: &RandomState, | ||
| callback: F, | ||
| ) -> Result<R> | ||
| where | ||
| I: IntoIterator<Item = T>, | ||
| T: AsDynArray, | ||
| F: FnOnce(&[u64]) -> Result<R>, | ||
| { | ||
| // Peek at the first array to determine buffer size without fully collecting | ||
| let mut iter = arrays.into_iter().peekable(); | ||
|
|
||
| // Get the required size from the first array | ||
| let required_size = match iter.peek() { | ||
| Some(arr) => arr.as_dyn_array().len(), | ||
| None => return _internal_err!("with_hashes requires at least one array"), | ||
| }; | ||
|
|
||
| HASH_BUFFER.try_with(|cell| { | ||
| let mut buffer = cell.try_borrow_mut() | ||
| .map_err(|_| _internal_datafusion_err!("with_hashes cannot be called reentrantly on the same thread"))?; | ||
|
|
||
| // Ensure buffer has sufficient length, clearing old values | ||
| buffer.clear(); | ||
| buffer.resize(required_size, 0); | ||
|
|
||
| // Create hashes in the buffer - this consumes the iterator | ||
| create_hashes(iter, random_state, &mut buffer[..required_size])?; | ||
|
|
||
| // Execute the callback with an immutable slice | ||
| let result = callback(&buffer[..required_size])?; | ||
|
|
||
| // Cleanup: truncate if buffer grew too large | ||
| if buffer.capacity() > MAX_BUFFER_SIZE { | ||
| buffer.truncate(MAX_BUFFER_SIZE); | ||
| buffer.shrink_to_fit(); | ||
| } | ||
|
|
||
| Ok(result) | ||
| }).map_err(|_| _internal_datafusion_err!("with_hashes cannot access thread-local storage during or after thread destruction"))? | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be its own PR if we really want to reduce the diff. Then we could also use it in all of the other call sites that have similar patterns. Otherwise I will do that as a follow up to avoid an extra sequencing step.
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
@alamb I'm not sure what's going on with these benchmarks. @friendlymatthew and I both ran it independently and got mostly big speedups as per above. Could you maybe try on your local machine? Here's how I've been running them: git checkout main && git log -1 --oneline && cargo bench --bench in_list -- --save-baseline before && git checkout refactor-in-list && git log -1 --oneline && cargo bench --bench in_list -- --baseline before |
## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in apache#17171. A "target state" is tracked in apache#18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - (This PR): apache#18448 - apache#18449 (depends on apache#18448) - apache#18451 ## Changes in this PR Change create_hashes and related functions to work with &dyn Array references instead of requiring ArrayRef (Arc-wrapped arrays). This avoids unnecessary Arc::clone() calls and enables calls that only have an &dyn Array to use the hashing utilities. - Add create_hashes_from_arrays(&[&dyn Array]) function - Refactor hash_dictionary, hash_list_array, hash_fixed_list_array to use references instead of cloning - Extract hash_single_array() helper for common logic --------- Co-authored-by: Andrew Lamb <[email protected]>
|
🤖 |
|
🤖: Benchmark completed Details
|
|
I don't know what is going on on the benchmarks 🤔 I ran this on my laptop: gh pr checkout https://github.com/apache/datafusion/pull/18449 && git log -1 --oneline && cargo bench --bench in_list -- --save-baseline in_list && git checkout `git merge-base apache/main head` && git log -1 --oneline && cargo bench --bench in_list -- --save-baseline beforeThen critcmp reports: |
|
So something is up with the automated benchmarks? If I'm reading your report correctly this branch is faster in practically all cases, including those that the automated benchmarks reported it is worse in. Am I missing something here? |
|
Could it somehow be architecture related? I assume we both have ARM MacBooks while the automated runner is x86? |
That is my reading of the benchmarks too
It is a good theory -- I am rerunning the same commands on my benchmarking machine (GCP c2-standard-8 (8 vCPUs, 32 GB Memory)) and will report results |
|
Here are the results from the gcp runner (aka I can reproduce my script and it shows several cases much slower) I am also going to see if |
|
Results are the same when I used target-native |
|
Crazy stuff. So this is something like 70% on ARM but 70% slower on x86. I
have not faced this sort of thing before. Will try to investigate.
…On Wed, Nov 12, 2025 at 3:42 AM Andrew Lamb ***@***.***> wrote:
*alamb* left a comment (apache/datafusion#18449)
<#18449 (comment)>
I could reproduce the slowdown on a local x86_64 dedicated machine
(windows!) so i think it is reasonable to conclude this is something
related to the platform
Screenshot.2025-11-11.at.2.41.24.PM.png (view on web)
<https://github.com/user-attachments/assets/1d56ab37-5363-427f-a33d-02b2b131adc0>
—
Reply to this email directly, view it on GitHub
<#18449 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AANMPP7B73XUR6CUFNDNZ4334I327AVCNFSM6AAAAACK5MVYFOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZTKMJYGQ4TGMJXGI>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|

Background
This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in #17171.
A "target state" is tracked in #18393.
There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own:
HashJoinExecand use CASE expressions for more precise filters #18451Changes in this PR
by adding an internal InListStorage enum with Array and Exprs variants
in_list_from_array(expr, list_array, negated)for creating InList from arraysAlthough the diff looks large most of it is actually tests and docs. I think the actual code change is a negative LOC change, or at least negative complexity (eliminates a trait, a macro, matching on data types).