Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 191 additions & 87 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

//! Functionality used both on logical and physical plans

#[cfg(not(feature = "force_hash_collisions"))]
use std::sync::Arc;

use ahash::RandomState;
use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
use arrow::array::*;
Expand Down Expand Up @@ -215,12 +212,11 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
// Hash each dictionary value once, and then use that computed
// hash for each key value to avoid a potentially expensive
// redundant hashing for large dictionary elements (e.g. strings)
let dict_values = Arc::clone(array.values());
let dict_values = array.values();
let mut dict_hashes = vec![0; dict_values.len()];
create_hashes(&[dict_values], random_state, &mut dict_hashes)?;
create_hashes([dict_values], random_state, &mut dict_hashes)?;

// combine hash for each index in values
let dict_values = array.values();
for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
if let Some(key) = key {
let idx = key.as_usize();
Expand Down Expand Up @@ -308,11 +304,11 @@ fn hash_list_array<OffsetSize>(
where
OffsetSize: OffsetSizeTrait,
{
let values = Arc::clone(array.values());
let values = array.values();
let offsets = array.value_offsets();
let nulls = array.nulls();
let mut values_hashes = vec![0u64; values.len()];
create_hashes(&[values], random_state, &mut values_hashes)?;
create_hashes([values], random_state, &mut values_hashes)?;
if let Some(nulls) = nulls {
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
if nulls.is_valid(i) {
Expand All @@ -339,11 +335,11 @@ fn hash_fixed_list_array(
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let values = Arc::clone(array.values());
let values = array.values();
let value_length = array.value_length() as usize;
let nulls = array.nulls();
let mut values_hashes = vec![0u64; values.len()];
create_hashes(&[values], random_state, &mut values_hashes)?;
create_hashes([values], random_state, &mut values_hashes)?;
if let Some(nulls) = nulls {
for i in 0..array.len() {
if nulls.is_valid(i) {
Expand All @@ -366,83 +362,132 @@ fn hash_fixed_list_array(
Ok(())
}

/// Test version of `create_hashes` that produces the same value for
/// all hashes (to test collisions)
///
/// See comments on `hashes_buffer` for more details
/// Internal helper function that hashes a single array and either initializes or combines
/// the hash values in the buffer.
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_single_array(
array: &dyn Array,
random_state: &RandomState,
hashes_buffer: &mut [u64],
rehash: bool,
) -> Result<()> {
downcast_primitive_array! {
array => hash_array_primitive(array, random_state, hashes_buffer, rehash),
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
DataType::Boolean => hash_array(&as_boolean_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8 => hash_array(&as_string_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8View => hash_array(&as_string_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeUtf8 => hash_array(&as_largestring_array(array), random_state, hashes_buffer, rehash),
DataType::Binary => hash_array(&as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
DataType::BinaryView => hash_array(&as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeBinary => hash_array(&as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
DataType::FixedSizeBinary(_) => {
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
hash_array(&array, random_state, hashes_buffer, rehash)
}
DataType::Dictionary(_, _) => downcast_dictionary_array! {
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
_ => unreachable!()
}
DataType::Struct(_) => {
let array = as_struct_array(array)?;
hash_struct_array(array, random_state, hashes_buffer)?;
}
DataType::List(_) => {
let array = as_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::LargeList(_) => {
let array = as_large_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::Map(_, _) => {
let array = as_map_array(array)?;
hash_map_array(array, random_state, hashes_buffer)?;
}
DataType::FixedSizeList(_,_) => {
let array = as_fixed_size_list_array(array)?;
hash_fixed_list_array(array, random_state, hashes_buffer)?;
}
_ => {
// This is internal because we should have caught this before.
return _internal_err!(
"Unsupported data type in hasher: {}",
array.data_type()
);
}
}
Ok(())
}

/// Test version of `hash_single_array` that forces all hashes to collide to zero.
#[cfg(feature = "force_hash_collisions")]
pub fn create_hashes<'a>(
_arrays: &[ArrayRef],
fn hash_single_array(
_array: &dyn Array,
_random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
hashes_buffer: &mut [u64],
_rehash: bool,
) -> Result<()> {
for hash in hashes_buffer.iter_mut() {
*hash = 0
}
Ok(hashes_buffer)
Ok(())
}

/// Something that can be returned as a `&dyn Array`.
///
/// We want `create_hashes` to accept either `&dyn Array` or `ArrayRef`,
/// and this seems the best way to do so.
///
/// We tried having it accept `AsRef<dyn Array>`
/// but that is not implemented for and cannot be implemented for
/// `&dyn Array` so callers that have the latter would not be able
/// to call `create_hashes` directly. This shim trait makes it possible.
pub trait AsDynArray {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub trait AsDynArray {
/// Something that can be returned as a `&dyn Array`
///
/// For some reason we can't use `AsRef<dyn Array>` because
/// it is not implemented for`&dyn Array`
pub trait AsDynArray {

fn as_dyn_array(&self) -> &dyn Array;
}

impl AsDynArray for dyn Array {
fn as_dyn_array(&self) -> &dyn Array {
self
}
}

impl AsDynArray for &dyn Array {
fn as_dyn_array(&self) -> &dyn Array {
*self
}
}

impl AsDynArray for ArrayRef {
fn as_dyn_array(&self) -> &dyn Array {
self.as_ref()
}
}

/// Creates hash values for every row, based on the values in the
/// columns.
impl AsDynArray for &ArrayRef {
fn as_dyn_array(&self) -> &dyn Array {
self.as_ref()
}
}

/// Creates hash values for every row, based on the values in the columns.
///
/// The number of rows to hash is determined by `hashes_buffer.len()`.
/// `hashes_buffer` should be pre-sized appropriately
#[cfg(not(feature = "force_hash_collisions"))]
pub fn create_hashes<'a>(
arrays: &[ArrayRef],
/// `hashes_buffer` should be pre-sized appropriately.
pub fn create_hashes<'a, I, T>(
arrays: I,
random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
for (i, col) in arrays.iter().enumerate() {
let array = col.as_ref();
) -> Result<&'a mut Vec<u64>>
where
I: IntoIterator<Item = T>,
T: AsDynArray,
{
for (i, array) in arrays.into_iter().enumerate() {
// combine hashes with `combine_hashes` for all columns besides the first
let rehash = i >= 1;
downcast_primitive_array! {
array => hash_array_primitive(array, random_state, hashes_buffer, rehash),
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
DataType::Boolean => hash_array(&as_boolean_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8 => hash_array(&as_string_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8View => hash_array(&as_string_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeUtf8 => hash_array(&as_largestring_array(array), random_state, hashes_buffer, rehash),
DataType::Binary => hash_array(&as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
DataType::BinaryView => hash_array(&as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeBinary => hash_array(&as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
DataType::FixedSizeBinary(_) => {
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
hash_array(&array, random_state, hashes_buffer, rehash)
}
DataType::Dictionary(_, _) => downcast_dictionary_array! {
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
_ => unreachable!()
}
DataType::Struct(_) => {
let array = as_struct_array(array)?;
hash_struct_array(array, random_state, hashes_buffer)?;
}
DataType::List(_) => {
let array = as_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::LargeList(_) => {
let array = as_large_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::Map(_, _) => {
let array = as_map_array(array)?;
hash_map_array(array, random_state, hashes_buffer)?;
}
DataType::FixedSizeList(_,_) => {
let array = as_fixed_size_list_array(array)?;
hash_fixed_list_array(array, random_state, hashes_buffer)?;
}
_ => {
// This is internal because we should have caught this before.
return _internal_err!(
"Unsupported data type in hasher: {}",
col.data_type()
);
}
}
hash_single_array(array.as_dyn_array(), random_state, hashes_buffer, rehash)?;
}
Ok(hashes_buffer)
}
Expand All @@ -465,7 +510,7 @@ mod tests {
.collect::<Decimal128Array>()
.with_precision_and_scale(20, 3)
.unwrap();
let array_ref = Arc::new(array);
let array_ref: ArrayRef = Arc::new(array);
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; array_ref.len()];
let hashes = create_hashes(&[array_ref], &random_state, hashes_buff)?;
Expand All @@ -478,15 +523,21 @@ mod tests {
let empty_array = FixedSizeListBuilder::new(StringBuilder::new(), 1).finish();
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; 0];
let hashes = create_hashes(&[Arc::new(empty_array)], &random_state, hashes_buff)?;
let hashes = create_hashes(
&[Arc::new(empty_array) as ArrayRef],
&random_state,
hashes_buff,
)?;
assert_eq!(hashes, &Vec::<u64>::new());
Ok(())
}

#[test]
fn create_hashes_for_float_arrays() -> Result<()> {
let f32_arr = Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 444.7]));
let f64_arr = Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 444.7]));
let f32_arr: ArrayRef =
Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 444.7]));
let f64_arr: ArrayRef =
Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 444.7]));

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; f32_arr.len()];
Expand Down Expand Up @@ -514,8 +565,10 @@ mod tests {
Some(b"Longer than 12 bytes string"),
];

let binary_array = Arc::new(binary.iter().cloned().collect::<$ARRAY>());
let ref_array = Arc::new(binary.iter().cloned().collect::<BinaryArray>());
let binary_array: ArrayRef =
Arc::new(binary.iter().cloned().collect::<$ARRAY>());
let ref_array: ArrayRef =
Arc::new(binary.iter().cloned().collect::<BinaryArray>());

let random_state = RandomState::with_seeds(0, 0, 0, 0);

Expand Down Expand Up @@ -553,7 +606,7 @@ mod tests {
#[test]
fn create_hashes_fixed_size_binary() -> Result<()> {
let input_arg = vec![vec![1, 2], vec![5, 6], vec![5, 6]];
let fixed_size_binary_array =
let fixed_size_binary_array: ArrayRef =
Arc::new(FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap());

let random_state = RandomState::with_seeds(0, 0, 0, 0);
Expand All @@ -580,8 +633,9 @@ mod tests {
Some("Longer than 12 bytes string"),
];

let string_array = Arc::new(strings.iter().cloned().collect::<$ARRAY>());
let dict_array = Arc::new(
let string_array: ArrayRef =
Arc::new(strings.iter().cloned().collect::<$ARRAY>());
let dict_array: ArrayRef = Arc::new(
strings
.iter()
.cloned()
Expand Down Expand Up @@ -629,8 +683,9 @@ mod tests {
fn create_hashes_for_dict_arrays() {
let strings = [Some("foo"), None, Some("bar"), Some("foo"), None];

let string_array = Arc::new(strings.iter().cloned().collect::<StringArray>());
let dict_array = Arc::new(
let string_array: ArrayRef =
Arc::new(strings.iter().cloned().collect::<StringArray>());
let dict_array: ArrayRef = Arc::new(
strings
.iter()
.cloned()
Expand Down Expand Up @@ -865,8 +920,9 @@ mod tests {
let strings1 = [Some("foo"), None, Some("bar")];
let strings2 = [Some("blarg"), Some("blah"), None];

let string_array = Arc::new(strings1.iter().cloned().collect::<StringArray>());
let dict_array = Arc::new(
let string_array: ArrayRef =
Arc::new(strings1.iter().cloned().collect::<StringArray>());
let dict_array: ArrayRef = Arc::new(
strings2
.iter()
.cloned()
Expand Down Expand Up @@ -896,4 +952,52 @@ mod tests {

assert_ne!(one_col_hashes, two_col_hashes);
}

#[test]
fn test_create_hashes_from_arrays() {
let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let float_array: ArrayRef =
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; int_array.len()];
let hashes =
create_hashes(&[int_array, float_array], &random_state, hashes_buff).unwrap();
assert_eq!(hashes.len(), 4,);
}

#[test]
fn test_create_hashes_from_dyn_arrays() {
let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let float_array: ArrayRef =
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));

// Verify that we can call create_hashes with only &dyn Array
fn test(arr1: &dyn Array, arr2: &dyn Array) {
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; arr1.len()];
let hashes = create_hashes([arr1, arr2], &random_state, hashes_buff).unwrap();
assert_eq!(hashes.len(), 4,);
}
test(&*int_array, &*float_array);
}

#[test]
fn test_create_hashes_equivalence() {
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let random_state = RandomState::with_seeds(0, 0, 0, 0);

let mut hashes1 = vec![0; array.len()];
create_hashes(
&[Arc::clone(&array) as ArrayRef],
&random_state,
&mut hashes1,
)
.unwrap();

let mut hashes2 = vec![0; array.len()];
create_hashes([array], &random_state, &mut hashes2).unwrap();

assert_eq!(hashes1, hashes2);
}
}
Loading