Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 86 additions & 51 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
// redundant hashing for large dictionary elements (e.g. strings)
let dict_values = array.values();
let mut dict_hashes = vec![0; dict_values.len()];
create_hashes_from_arrays(&[dict_values.as_ref()], random_state, &mut dict_hashes)?;
create_hashes([dict_values], random_state, &mut dict_hashes)?;

// combine hash for each index in values
for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
Expand Down Expand Up @@ -308,7 +308,7 @@ where
let offsets = array.value_offsets();
let nulls = array.nulls();
let mut values_hashes = vec![0u64; values.len()];
create_hashes_from_arrays(&[values.as_ref()], random_state, &mut values_hashes)?;
create_hashes([values], random_state, &mut values_hashes)?;
if let Some(nulls) = nulls {
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
if nulls.is_valid(i) {
Expand Down Expand Up @@ -339,7 +339,7 @@ fn hash_fixed_list_array(
let value_length = array.value_length() as usize;
let nulls = array.nulls();
let mut values_hashes = vec![0u64; values.len()];
create_hashes_from_arrays(&[values.as_ref()], random_state, &mut values_hashes)?;
create_hashes([values], random_state, &mut values_hashes)?;
if let Some(nulls) = nulls {
for i in 0..array.len() {
if nulls.is_valid(i) {
Expand Down Expand Up @@ -434,41 +434,51 @@ fn hash_single_array(
Ok(())
}

/// Creates hash values for every row, based on the values in the columns.
///
/// The number of rows to hash is determined by `hashes_buffer.len()`.
/// `hashes_buffer` should be pre-sized appropriately
///
/// This is the same as [`create_hashes`] but accepts `&dyn Array`s instead of requiring
/// `ArrayRef`s.
pub fn create_hashes_from_arrays<'a>(
arrays: &[&dyn Array],
random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
for (i, &array) in arrays.iter().enumerate() {
// combine hashes with `combine_hashes` for all columns besides the first
let rehash = i >= 1;
hash_single_array(array, random_state, hashes_buffer, rehash)?;
pub trait AsDynArray {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i assume we can't implement this for T: AsRef<dyn Array>?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure 🤷

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The answer is no:

conflicting implementations of trait `AsDynArray` for type `&dyn arrow::array::Array`
upstream crates may add a new impl of trait `std::convert::AsRef<(dyn arrow::array::Array + 'static)>` for type `&dyn arrow::array::Array` in future versions

🤷🏻

fn as_dyn_array(&self) -> &dyn Array;
}

impl AsDynArray for dyn Array {
fn as_dyn_array(&self) -> &dyn Array {
self
}
}

impl AsDynArray for &dyn Array {
fn as_dyn_array(&self) -> &dyn Array {
*self
}
}

impl AsDynArray for ArrayRef {
fn as_dyn_array(&self) -> &dyn Array {
self.as_ref()
}
}

impl AsDynArray for &ArrayRef {
fn as_dyn_array(&self) -> &dyn Array {
self.as_ref()
}
Ok(hashes_buffer)
}

/// Creates hash values for every row, based on the values in the columns.
///
/// The number of rows to hash is determined by `hashes_buffer.len()`.
/// `hashes_buffer` should be pre-sized appropriately.
///
/// This is the same as [`create_hashes_from_arrays`] but accepts `ArrayRef`s.
pub fn create_hashes<'a>(
arrays: &[ArrayRef],
pub fn create_hashes<'a, I, T>(
arrays: I,
random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
for (i, array) in arrays.iter().enumerate() {
) -> Result<&'a mut Vec<u64>>
where
I: IntoIterator<Item = T>,
T: AsDynArray,
{
for (i, array) in arrays.into_iter().enumerate() {
// combine hashes with `combine_hashes` for all columns besides the first
let rehash = i >= 1;
hash_single_array(array.as_ref(), random_state, hashes_buffer, rehash)?;
hash_single_array(array.as_dyn_array(), random_state, hashes_buffer, rehash)?;
}
Ok(hashes_buffer)
}
Expand All @@ -491,7 +501,7 @@ mod tests {
.collect::<Decimal128Array>()
.with_precision_and_scale(20, 3)
.unwrap();
let array_ref = Arc::new(array);
let array_ref: ArrayRef = Arc::new(array);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the major user facing change -- if you explicitly create an Arc ... something to create_hashes, before this PR the type inference algorithm would automatically determine ArrayRef. Now you need to explicitly declare that you want an ArrayRef

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; array_ref.len()];
let hashes = create_hashes(&[array_ref], &random_state, hashes_buff)?;
Expand All @@ -504,15 +514,21 @@ mod tests {
let empty_array = FixedSizeListBuilder::new(StringBuilder::new(), 1).finish();
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; 0];
let hashes = create_hashes(&[Arc::new(empty_array)], &random_state, hashes_buff)?;
let hashes = create_hashes(
&[Arc::new(empty_array) as ArrayRef],
&random_state,
hashes_buff,
)?;
assert_eq!(hashes, &Vec::<u64>::new());
Ok(())
}

#[test]
fn create_hashes_for_float_arrays() -> Result<()> {
let f32_arr = Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 444.7]));
let f64_arr = Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 444.7]));
let f32_arr: ArrayRef =
Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 444.7]));
let f64_arr: ArrayRef =
Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 444.7]));

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; f32_arr.len()];
Expand Down Expand Up @@ -540,8 +556,10 @@ mod tests {
Some(b"Longer than 12 bytes string"),
];

let binary_array = Arc::new(binary.iter().cloned().collect::<$ARRAY>());
let ref_array = Arc::new(binary.iter().cloned().collect::<BinaryArray>());
let binary_array: ArrayRef =
Arc::new(binary.iter().cloned().collect::<$ARRAY>());
let ref_array: ArrayRef =
Arc::new(binary.iter().cloned().collect::<BinaryArray>());

let random_state = RandomState::with_seeds(0, 0, 0, 0);

Expand Down Expand Up @@ -579,7 +597,7 @@ mod tests {
#[test]
fn create_hashes_fixed_size_binary() -> Result<()> {
let input_arg = vec![vec![1, 2], vec![5, 6], vec![5, 6]];
let fixed_size_binary_array =
let fixed_size_binary_array: ArrayRef =
Arc::new(FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap());

let random_state = RandomState::with_seeds(0, 0, 0, 0);
Expand All @@ -606,8 +624,9 @@ mod tests {
Some("Longer than 12 bytes string"),
];

let string_array = Arc::new(strings.iter().cloned().collect::<$ARRAY>());
let dict_array = Arc::new(
let string_array: ArrayRef =
Arc::new(strings.iter().cloned().collect::<$ARRAY>());
let dict_array: ArrayRef = Arc::new(
strings
.iter()
.cloned()
Expand Down Expand Up @@ -655,8 +674,9 @@ mod tests {
fn create_hashes_for_dict_arrays() {
let strings = [Some("foo"), None, Some("bar"), Some("foo"), None];

let string_array = Arc::new(strings.iter().cloned().collect::<StringArray>());
let dict_array = Arc::new(
let string_array: ArrayRef =
Arc::new(strings.iter().cloned().collect::<StringArray>());
let dict_array: ArrayRef = Arc::new(
strings
.iter()
.cloned()
Expand Down Expand Up @@ -891,8 +911,9 @@ mod tests {
let strings1 = [Some("foo"), None, Some("bar")];
let strings2 = [Some("blarg"), Some("blah"), None];

let string_array = Arc::new(strings1.iter().cloned().collect::<StringArray>());
let dict_array = Arc::new(
let string_array: ArrayRef =
Arc::new(strings1.iter().cloned().collect::<StringArray>());
let dict_array: ArrayRef = Arc::new(
strings2
.iter()
.cloned()
Expand Down Expand Up @@ -925,23 +946,38 @@ mod tests {

#[test]
fn test_create_hashes_from_arrays() {
let int_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let float_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let float_array: ArrayRef =
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; int_array.len()];
let hashes = create_hashes_from_arrays(
&[int_array.as_ref(), float_array.as_ref()],
&random_state,
hashes_buff,
)
.unwrap();
let hashes =
create_hashes(&[int_array, float_array], &random_state, hashes_buff).unwrap();
assert_eq!(hashes.len(), 4,);
}

#[test]
fn test_create_hashes_from_dyn_arrays() {
let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let float_array: ArrayRef =
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));

// Verify that we can call create_hashes with only &dyn Array
fn test(arr1: &dyn Array, arr2: &dyn Array) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is a test showing calling create_hashes with only &dyn Arrays

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; arr1.len()];
let hashes =
create_hashes([arr1, arr2], &random_state, hashes_buff).unwrap();
assert_eq!(hashes.len(), 4,);
}
test(&*int_array, &*float_array);
}


#[test]
fn test_create_hashes_equivalence() {
let array = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let random_state = RandomState::with_seeds(0, 0, 0, 0);

let mut hashes1 = vec![0; array.len()];
Expand All @@ -953,8 +989,7 @@ mod tests {
.unwrap();

let mut hashes2 = vec![0; array.len()];
create_hashes_from_arrays(&[array.as_ref()], &random_state, &mut hashes2)
.unwrap();
create_hashes([array], &random_state, &mut hashes2).unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this works. The whole point is that I want to call create_hashes from a function where I only have &dyn Array, which does not implement AsRef<dyn Array>.

As far as I know there's no way to convert an &dyn Array into AsRef<dyn Arary without some sort of wrapper type:

struct AsRefWrapper<'a>(&'a dyn Array);

impl<'a> AsRef<dyn Array> for AsRefWrapper<'a> {
    fn as_ref(&self) -> &dyn Array {
        self.0
    }
}

Or something like that. Which seems... a bit annoying to have to do at each call site.
I think I tried the generics method and ended up with two functions because of this limitation.
Do you know a better way around this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see

I think we should at least add a test for the specific usecase. Here is what I have:

    #[test]
    fn test_create_hashes_from_dyn_arrays() {
        let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
        let float_array: ArrayRef =
            Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));

        // Verify that we can call create_hashes with only &dyn Array
        fn test(arr1: &dyn Array, arr2: &dyn Array) {
            let random_state = RandomState::with_seeds(0, 0, 0, 0);
            let hashes_buff = &mut vec![0; arr1.len()];
            let hashes =
                create_hashes([arr1, arr2], &random_state, hashes_buff).unwrap();
            assert_eq!(hashes.len(), 4,);
        }
        test(&*int_array, &*float_array);
    }

Maybe we can do this with a special trait -- I'll play around with it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a special trait for this -- I don't really know why it is necessary but it does seem to work 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

creative solution!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"creative" could either be good or bad in this context 😆

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think good! It's an internal implementation detail that users won't have to think about, but it allows us to make the improvements to this function for all callers without introducing a new API. It's good 👍🏻


assert_eq!(hashes1, hashes2);
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::cast::{
};
use crate::error::{DataFusionError, Result, _exec_err, _internal_err, _not_impl_err};
use crate::format::DEFAULT_CAST_OPTIONS;
use crate::hash_utils::create_hashes_from_arrays;
use crate::hash_utils::create_hashes;
use crate::utils::SingleRowListArrayBuilder;
use crate::{_internal_datafusion_err, arrow_datafusion_err};
use arrow::array::{
Expand Down Expand Up @@ -880,7 +880,7 @@ fn hash_nested_array<H: Hasher>(arr: ArrayRef, state: &mut H) {
let len = arr.len();
let hashes_buffer = &mut vec![0; len];
let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
let hashes = create_hashes_from_arrays(&[arr.as_ref()], &random_state, hashes_buffer)
let hashes = create_hashes(&[arr], &random_state, hashes_buffer)
.expect("hash_nested_array: failed to create row hashes");
// Hash back to std::hash::Hasher
hashes.hash(state);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr-common/src/binary_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::array::{
};
use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::DataType;
use datafusion_common::hash_utils::create_hashes_from_arrays;
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt};
use std::any::type_name;
use std::fmt::Debug;
Expand Down Expand Up @@ -349,7 +349,7 @@ where
let batch_hashes = &mut self.hashes_buffer;
batch_hashes.clear();
batch_hashes.resize(values.len(), 0);
create_hashes_from_arrays(&[values.as_ref()], &self.random_state, batch_hashes)
create_hashes([values], &self.random_state, batch_hashes)
// hash is supported for all types and create_hashes only
// returns errors for unsupported types
.unwrap();
Expand Down
7 changes: 3 additions & 4 deletions datafusion/physical-expr-common/src/binary_view_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@
//! `StringViewArray`/`BinaryViewArray`.
//! Much of the code is from `binary_map.rs`, but with simpler implementation because we directly use the
//! [`GenericByteViewBuilder`].
use crate::binary_map::OutputType;
use ahash::RandomState;
use arrow::array::cast::AsArray;
use arrow::array::{Array, ArrayBuilder, ArrayRef, GenericByteViewBuilder};
use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType};
use datafusion_common::hash_utils::create_hashes_from_arrays;
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt};
use std::fmt::Debug;
use std::sync::Arc;

use crate::binary_map::OutputType;

/// HashSet optimized for storing string or binary values that can produce that
/// the final set as a `GenericBinaryViewArray` with minimal copies.
#[derive(Debug)]
Expand Down Expand Up @@ -243,7 +242,7 @@ where
let batch_hashes = &mut self.hashes_buffer;
batch_hashes.clear();
batch_hashes.resize(values.len(), 0);
create_hashes_from_arrays(&[values.as_ref()], &self.random_state, batch_hashes)
create_hashes([values], &self.random_state, batch_hashes)
// hash is supported for all types and create_hashes only
// returns errors for unsupported types
.unwrap();
Expand Down
26 changes: 5 additions & 21 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,7 @@ mod tests {
use arrow::buffer::NullBuffer;
use arrow::datatypes::{DataType, Field};
use arrow_schema::Schema;
use datafusion_common::hash_utils::create_hashes_from_arrays;
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
use datafusion_common::{
assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err,
Expand Down Expand Up @@ -3454,11 +3454,7 @@ mod tests {

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; left.num_rows()];
let hashes = create_hashes_from_arrays(
&[left.columns()[0].as_ref()],
&random_state,
hashes_buff,
)?;
let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes the callsites look much nicer


// Maps both values to both indices (1 and 2, representing input 0 and 1)
// 0 -> (0, 1)
Expand Down Expand Up @@ -3487,11 +3483,7 @@ mod tests {
let right_keys_values =
key_column.evaluate(&right)?.into_array(right.num_rows())?;
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes_from_arrays(
&[right_keys_values.as_ref()],
&random_state,
&mut hashes_buffer,
)?;
create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;

let (l, r, _) = lookup_join_hashmap(
&join_hash_map,
Expand Down Expand Up @@ -3525,11 +3517,7 @@ mod tests {

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; left.num_rows()];
let hashes = create_hashes_from_arrays(
&[left.columns()[0].as_ref()],
&random_state,
hashes_buff,
)?;
let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?;

hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
Expand All @@ -3552,11 +3540,7 @@ mod tests {
let right_keys_values =
key_column.evaluate(&right)?.into_array(right.num_rows())?;
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes_from_arrays(
&[right_keys_values.as_ref()],
&random_state,
&mut hashes_buffer,
)?;
create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;

let (l, r, _) = lookup_join_hashmap(
&join_hash_map,
Expand Down
Loading