Skip to content

Commit a14159a

Browse files
adriangbalamb
andcommitted
Refactor create_hashes to accept array references
Co-authored-by: Andrew Lamb <[email protected]>
1 parent f57da83 commit a14159a

File tree

5 files changed

+191
-113
lines changed

5 files changed

+191
-113
lines changed

datafusion/common/src/hash_utils.rs

Lines changed: 182 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
//! Functionality used both on logical and physical plans
1919
20-
#[cfg(not(feature = "force_hash_collisions"))]
21-
use std::sync::Arc;
22-
2320
use ahash::RandomState;
2421
use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
2522
use arrow::array::*;
@@ -215,12 +212,11 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
215212
// Hash each dictionary value once, and then use that computed
216213
// hash for each key value to avoid a potentially expensive
217214
// redundant hashing for large dictionary elements (e.g. strings)
218-
let dict_values = Arc::clone(array.values());
215+
let dict_values = array.values();
219216
let mut dict_hashes = vec![0; dict_values.len()];
220-
create_hashes(&[dict_values], random_state, &mut dict_hashes)?;
217+
create_hashes([dict_values], random_state, &mut dict_hashes)?;
221218

222219
// combine hash for each index in values
223-
let dict_values = array.values();
224220
for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
225221
if let Some(key) = key {
226222
let idx = key.as_usize();
@@ -308,11 +304,11 @@ fn hash_list_array<OffsetSize>(
308304
where
309305
OffsetSize: OffsetSizeTrait,
310306
{
311-
let values = Arc::clone(array.values());
307+
let values = array.values();
312308
let offsets = array.value_offsets();
313309
let nulls = array.nulls();
314310
let mut values_hashes = vec![0u64; values.len()];
315-
create_hashes(&[values], random_state, &mut values_hashes)?;
311+
create_hashes([values], random_state, &mut values_hashes)?;
316312
if let Some(nulls) = nulls {
317313
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
318314
if nulls.is_valid(i) {
@@ -339,11 +335,11 @@ fn hash_fixed_list_array(
339335
random_state: &RandomState,
340336
hashes_buffer: &mut [u64],
341337
) -> Result<()> {
342-
let values = Arc::clone(array.values());
338+
let values = array.values();
343339
let value_length = array.value_length() as usize;
344340
let nulls = array.nulls();
345341
let mut values_hashes = vec![0u64; values.len()];
346-
create_hashes(&[values], random_state, &mut values_hashes)?;
342+
create_hashes([values], random_state, &mut values_hashes)?;
347343
if let Some(nulls) = nulls {
348344
for i in 0..array.len() {
349345
if nulls.is_valid(i) {
@@ -366,83 +362,123 @@ fn hash_fixed_list_array(
366362
Ok(())
367363
}
368364

369-
/// Test version of `create_hashes` that produces the same value for
370-
/// all hashes (to test collisions)
371-
///
372-
/// See comments on `hashes_buffer` for more details
365+
/// Internal helper function that hashes a single array and either initializes or combines
366+
/// the hash values in the buffer.
367+
#[cfg(not(feature = "force_hash_collisions"))]
368+
fn hash_single_array(
369+
array: &dyn Array,
370+
random_state: &RandomState,
371+
hashes_buffer: &mut [u64],
372+
rehash: bool,
373+
) -> Result<()> {
374+
downcast_primitive_array! {
375+
array => hash_array_primitive(array, random_state, hashes_buffer, rehash),
376+
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
377+
DataType::Boolean => hash_array(as_boolean_array(array)?, random_state, hashes_buffer, rehash),
378+
DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, rehash),
379+
DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash),
380+
DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, rehash),
381+
DataType::Binary => hash_array(as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
382+
DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
383+
DataType::LargeBinary => hash_array(as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
384+
DataType::FixedSizeBinary(_) => {
385+
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
386+
hash_array(array, random_state, hashes_buffer, rehash)
387+
}
388+
DataType::Dictionary(_, _) => downcast_dictionary_array! {
389+
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
390+
_ => unreachable!()
391+
}
392+
DataType::Struct(_) => {
393+
let array = as_struct_array(array)?;
394+
hash_struct_array(array, random_state, hashes_buffer)?;
395+
}
396+
DataType::List(_) => {
397+
let array = as_list_array(array)?;
398+
hash_list_array(array, random_state, hashes_buffer)?;
399+
}
400+
DataType::LargeList(_) => {
401+
let array = as_large_list_array(array)?;
402+
hash_list_array(array, random_state, hashes_buffer)?;
403+
}
404+
DataType::Map(_, _) => {
405+
let array = as_map_array(array)?;
406+
hash_map_array(array, random_state, hashes_buffer)?;
407+
}
408+
DataType::FixedSizeList(_,_) => {
409+
let array = as_fixed_size_list_array(array)?;
410+
hash_fixed_list_array(array, random_state, hashes_buffer)?;
411+
}
412+
_ => {
413+
// This is internal because we should have caught this before.
414+
return _internal_err!(
415+
"Unsupported data type in hasher: {}",
416+
array.data_type()
417+
);
418+
}
419+
}
420+
Ok(())
421+
}
422+
423+
/// Test version of `hash_single_array` that forces all hashes to collide to zero.
373424
#[cfg(feature = "force_hash_collisions")]
374-
pub fn create_hashes<'a>(
375-
_arrays: &[ArrayRef],
425+
fn hash_single_array(
426+
_array: &dyn Array,
376427
_random_state: &RandomState,
377-
hashes_buffer: &'a mut Vec<u64>,
378-
) -> Result<&'a mut Vec<u64>> {
428+
hashes_buffer: &mut [u64],
429+
_rehash: bool,
430+
) -> Result<()> {
379431
for hash in hashes_buffer.iter_mut() {
380432
*hash = 0
381433
}
382-
Ok(hashes_buffer)
434+
Ok(())
435+
}
436+
437+
pub trait AsDynArray {
438+
fn as_dyn_array(&self) -> &dyn Array;
439+
}
440+
441+
impl AsDynArray for dyn Array {
442+
fn as_dyn_array(&self) -> &dyn Array {
443+
self
444+
}
445+
}
446+
447+
impl AsDynArray for &dyn Array {
448+
fn as_dyn_array(&self) -> &dyn Array {
449+
*self
450+
}
451+
}
452+
453+
impl AsDynArray for ArrayRef {
454+
fn as_dyn_array(&self) -> &dyn Array {
455+
self.as_ref()
456+
}
383457
}
384458

385-
/// Creates hash values for every row, based on the values in the
386-
/// columns.
459+
impl AsDynArray for &ArrayRef {
460+
fn as_dyn_array(&self) -> &dyn Array {
461+
self.as_ref()
462+
}
463+
}
464+
465+
/// Creates hash values for every row, based on the values in the columns.
387466
///
388467
/// The number of rows to hash is determined by `hashes_buffer.len()`.
389-
/// `hashes_buffer` should be pre-sized appropriately
390-
#[cfg(not(feature = "force_hash_collisions"))]
391-
pub fn create_hashes<'a>(
392-
arrays: &[ArrayRef],
468+
/// `hashes_buffer` should be pre-sized appropriately.
469+
pub fn create_hashes<'a, I, T>(
470+
arrays: I,
393471
random_state: &RandomState,
394472
hashes_buffer: &'a mut Vec<u64>,
395-
) -> Result<&'a mut Vec<u64>> {
396-
for (i, col) in arrays.iter().enumerate() {
397-
let array = col.as_ref();
473+
) -> Result<&'a mut Vec<u64>>
474+
where
475+
I: IntoIterator<Item = T>,
476+
T: AsDynArray,
477+
{
478+
for (i, array) in arrays.into_iter().enumerate() {
398479
// combine hashes with `combine_hashes` for all columns besides the first
399480
let rehash = i >= 1;
400-
downcast_primitive_array! {
401-
array => hash_array_primitive(array, random_state, hashes_buffer, rehash),
402-
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
403-
DataType::Boolean => hash_array(as_boolean_array(array)?, random_state, hashes_buffer, rehash),
404-
DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, rehash),
405-
DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash),
406-
DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, rehash),
407-
DataType::Binary => hash_array(as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
408-
DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
409-
DataType::LargeBinary => hash_array(as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
410-
DataType::FixedSizeBinary(_) => {
411-
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
412-
hash_array(array, random_state, hashes_buffer, rehash)
413-
}
414-
DataType::Dictionary(_, _) => downcast_dictionary_array! {
415-
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
416-
_ => unreachable!()
417-
}
418-
DataType::Struct(_) => {
419-
let array = as_struct_array(array)?;
420-
hash_struct_array(array, random_state, hashes_buffer)?;
421-
}
422-
DataType::List(_) => {
423-
let array = as_list_array(array)?;
424-
hash_list_array(array, random_state, hashes_buffer)?;
425-
}
426-
DataType::LargeList(_) => {
427-
let array = as_large_list_array(array)?;
428-
hash_list_array(array, random_state, hashes_buffer)?;
429-
}
430-
DataType::Map(_, _) => {
431-
let array = as_map_array(array)?;
432-
hash_map_array(array, random_state, hashes_buffer)?;
433-
}
434-
DataType::FixedSizeList(_,_) => {
435-
let array = as_fixed_size_list_array(array)?;
436-
hash_fixed_list_array(array, random_state, hashes_buffer)?;
437-
}
438-
_ => {
439-
// This is internal because we should have caught this before.
440-
return _internal_err!(
441-
"Unsupported data type in hasher: {}",
442-
col.data_type()
443-
);
444-
}
445-
}
481+
hash_single_array(array.as_dyn_array(), random_state, hashes_buffer, rehash)?;
446482
}
447483
Ok(hashes_buffer)
448484
}
@@ -465,7 +501,7 @@ mod tests {
465501
.collect::<Decimal128Array>()
466502
.with_precision_and_scale(20, 3)
467503
.unwrap();
468-
let array_ref = Arc::new(array);
504+
let array_ref: ArrayRef = Arc::new(array);
469505
let random_state = RandomState::with_seeds(0, 0, 0, 0);
470506
let hashes_buff = &mut vec![0; array_ref.len()];
471507
let hashes = create_hashes(&[array_ref], &random_state, hashes_buff)?;
@@ -478,15 +514,21 @@ mod tests {
478514
let empty_array = FixedSizeListBuilder::new(StringBuilder::new(), 1).finish();
479515
let random_state = RandomState::with_seeds(0, 0, 0, 0);
480516
let hashes_buff = &mut vec![0; 0];
481-
let hashes = create_hashes(&[Arc::new(empty_array)], &random_state, hashes_buff)?;
517+
let hashes = create_hashes(
518+
&[Arc::new(empty_array) as ArrayRef],
519+
&random_state,
520+
hashes_buff,
521+
)?;
482522
assert_eq!(hashes, &Vec::<u64>::new());
483523
Ok(())
484524
}
485525

486526
#[test]
487527
fn create_hashes_for_float_arrays() -> Result<()> {
488-
let f32_arr = Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 444.7]));
489-
let f64_arr = Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 444.7]));
528+
let f32_arr: ArrayRef =
529+
Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 444.7]));
530+
let f64_arr: ArrayRef =
531+
Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 444.7]));
490532

491533
let random_state = RandomState::with_seeds(0, 0, 0, 0);
492534
let hashes_buff = &mut vec![0; f32_arr.len()];
@@ -514,8 +556,10 @@ mod tests {
514556
Some(b"Longer than 12 bytes string"),
515557
];
516558

517-
let binary_array = Arc::new(binary.iter().cloned().collect::<$ARRAY>());
518-
let ref_array = Arc::new(binary.iter().cloned().collect::<BinaryArray>());
559+
let binary_array: ArrayRef =
560+
Arc::new(binary.iter().cloned().collect::<$ARRAY>());
561+
let ref_array: ArrayRef =
562+
Arc::new(binary.iter().cloned().collect::<BinaryArray>());
519563

520564
let random_state = RandomState::with_seeds(0, 0, 0, 0);
521565

@@ -553,7 +597,7 @@ mod tests {
553597
#[test]
554598
fn create_hashes_fixed_size_binary() -> Result<()> {
555599
let input_arg = vec![vec![1, 2], vec![5, 6], vec![5, 6]];
556-
let fixed_size_binary_array =
600+
let fixed_size_binary_array: ArrayRef =
557601
Arc::new(FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap());
558602

559603
let random_state = RandomState::with_seeds(0, 0, 0, 0);
@@ -580,8 +624,9 @@ mod tests {
580624
Some("Longer than 12 bytes string"),
581625
];
582626

583-
let string_array = Arc::new(strings.iter().cloned().collect::<$ARRAY>());
584-
let dict_array = Arc::new(
627+
let string_array: ArrayRef =
628+
Arc::new(strings.iter().cloned().collect::<$ARRAY>());
629+
let dict_array: ArrayRef = Arc::new(
585630
strings
586631
.iter()
587632
.cloned()
@@ -629,8 +674,9 @@ mod tests {
629674
fn create_hashes_for_dict_arrays() {
630675
let strings = [Some("foo"), None, Some("bar"), Some("foo"), None];
631676

632-
let string_array = Arc::new(strings.iter().cloned().collect::<StringArray>());
633-
let dict_array = Arc::new(
677+
let string_array: ArrayRef =
678+
Arc::new(strings.iter().cloned().collect::<StringArray>());
679+
let dict_array: ArrayRef = Arc::new(
634680
strings
635681
.iter()
636682
.cloned()
@@ -865,8 +911,9 @@ mod tests {
865911
let strings1 = [Some("foo"), None, Some("bar")];
866912
let strings2 = [Some("blarg"), Some("blah"), None];
867913

868-
let string_array = Arc::new(strings1.iter().cloned().collect::<StringArray>());
869-
let dict_array = Arc::new(
914+
let string_array: ArrayRef =
915+
Arc::new(strings1.iter().cloned().collect::<StringArray>());
916+
let dict_array: ArrayRef = Arc::new(
870917
strings2
871918
.iter()
872919
.cloned()
@@ -896,4 +943,52 @@ mod tests {
896943

897944
assert_ne!(one_col_hashes, two_col_hashes);
898945
}
946+
947+
#[test]
948+
fn test_create_hashes_from_arrays() {
949+
let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
950+
let float_array: ArrayRef =
951+
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
952+
953+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
954+
let hashes_buff = &mut vec![0; int_array.len()];
955+
let hashes =
956+
create_hashes(&[int_array, float_array], &random_state, hashes_buff).unwrap();
957+
assert_eq!(hashes.len(), 4,);
958+
}
959+
960+
#[test]
961+
fn test_create_hashes_from_dyn_arrays() {
962+
let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
963+
let float_array: ArrayRef =
964+
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
965+
966+
// Verify that we can call create_hashes with only &dyn Array
967+
fn test(arr1: &dyn Array, arr2: &dyn Array) {
968+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
969+
let hashes_buff = &mut vec![0; arr1.len()];
970+
let hashes = create_hashes([arr1, arr2], &random_state, hashes_buff).unwrap();
971+
assert_eq!(hashes.len(), 4,);
972+
}
973+
test(&*int_array, &*float_array);
974+
}
975+
976+
#[test]
977+
fn test_create_hashes_equivalence() {
978+
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
979+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
980+
981+
let mut hashes1 = vec![0; array.len()];
982+
create_hashes(
983+
&[Arc::clone(&array) as ArrayRef],
984+
&random_state,
985+
&mut hashes1,
986+
)
987+
.unwrap();
988+
989+
let mut hashes2 = vec![0; array.len()];
990+
create_hashes([array], &random_state, &mut hashes2).unwrap();
991+
992+
assert_eq!(hashes1, hashes2);
993+
}
899994
}

datafusion/common/src/scalar/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -878,10 +878,10 @@ impl Hash for ScalarValue {
878878

879879
fn hash_nested_array<H: Hasher>(arr: ArrayRef, state: &mut H) {
880880
let len = arr.len();
881-
let arrays = vec![arr];
882881
let hashes_buffer = &mut vec![0; len];
883882
let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
884-
let hashes = create_hashes(&arrays, &random_state, hashes_buffer).unwrap();
883+
let hashes = create_hashes(&[arr], &random_state, hashes_buffer)
884+
.expect("hash_nested_array: failed to create row hashes");
885885
// Hash back to std::hash::Hasher
886886
hashes.hash(state);
887887
}

0 commit comments

Comments
 (0)