Skip to content

Commit a899ca0

Browse files
adriangbalamb
andauthored
Refactor create_hashes to accept array references (#18448)
## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in #17171. A "target state" is tracked in #18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - (This PR): #18448 - #18449 (depends on #18448) - #18451 ## Changes in this PR Change create_hashes and related functions to work with &dyn Array references instead of requiring ArrayRef (Arc-wrapped arrays). This avoids unnecessary Arc::clone() calls and enables calls that only have an &dyn Array to use the hashing utilities. - Add create_hashes_from_arrays(&[&dyn Array]) function - Refactor hash_dictionary, hash_list_array, hash_fixed_list_array to use references instead of cloning - Extract hash_single_array() helper for common logic --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent f32984b commit a899ca0

File tree

5 files changed

+200
-113
lines changed

5 files changed

+200
-113
lines changed

datafusion/common/src/hash_utils.rs

Lines changed: 191 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
//! Functionality used both on logical and physical plans
1919
20-
#[cfg(not(feature = "force_hash_collisions"))]
21-
use std::sync::Arc;
22-
2320
use ahash::RandomState;
2421
use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
2522
use arrow::array::*;
@@ -215,12 +212,11 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
215212
// Hash each dictionary value once, and then use that computed
216213
// hash for each key value to avoid a potentially expensive
217214
// redundant hashing for large dictionary elements (e.g. strings)
218-
let dict_values = Arc::clone(array.values());
215+
let dict_values = array.values();
219216
let mut dict_hashes = vec![0; dict_values.len()];
220-
create_hashes(&[dict_values], random_state, &mut dict_hashes)?;
217+
create_hashes([dict_values], random_state, &mut dict_hashes)?;
221218

222219
// combine hash for each index in values
223-
let dict_values = array.values();
224220
for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
225221
if let Some(key) = key {
226222
let idx = key.as_usize();
@@ -308,11 +304,11 @@ fn hash_list_array<OffsetSize>(
308304
where
309305
OffsetSize: OffsetSizeTrait,
310306
{
311-
let values = Arc::clone(array.values());
307+
let values = array.values();
312308
let offsets = array.value_offsets();
313309
let nulls = array.nulls();
314310
let mut values_hashes = vec![0u64; values.len()];
315-
create_hashes(&[values], random_state, &mut values_hashes)?;
311+
create_hashes([values], random_state, &mut values_hashes)?;
316312
if let Some(nulls) = nulls {
317313
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
318314
if nulls.is_valid(i) {
@@ -339,11 +335,11 @@ fn hash_fixed_list_array(
339335
random_state: &RandomState,
340336
hashes_buffer: &mut [u64],
341337
) -> Result<()> {
342-
let values = Arc::clone(array.values());
338+
let values = array.values();
343339
let value_length = array.value_length() as usize;
344340
let nulls = array.nulls();
345341
let mut values_hashes = vec![0u64; values.len()];
346-
create_hashes(&[values], random_state, &mut values_hashes)?;
342+
create_hashes([values], random_state, &mut values_hashes)?;
347343
if let Some(nulls) = nulls {
348344
for i in 0..array.len() {
349345
if nulls.is_valid(i) {
@@ -366,83 +362,132 @@ fn hash_fixed_list_array(
366362
Ok(())
367363
}
368364

369-
/// Test version of `create_hashes` that produces the same value for
370-
/// all hashes (to test collisions)
371-
///
372-
/// See comments on `hashes_buffer` for more details
365+
/// Internal helper function that hashes a single array and either initializes or combines
366+
/// the hash values in the buffer.
367+
#[cfg(not(feature = "force_hash_collisions"))]
368+
fn hash_single_array(
369+
array: &dyn Array,
370+
random_state: &RandomState,
371+
hashes_buffer: &mut [u64],
372+
rehash: bool,
373+
) -> Result<()> {
374+
downcast_primitive_array! {
375+
array => hash_array_primitive(array, random_state, hashes_buffer, rehash),
376+
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
377+
DataType::Boolean => hash_array(&as_boolean_array(array)?, random_state, hashes_buffer, rehash),
378+
DataType::Utf8 => hash_array(&as_string_array(array)?, random_state, hashes_buffer, rehash),
379+
DataType::Utf8View => hash_array(&as_string_view_array(array)?, random_state, hashes_buffer, rehash),
380+
DataType::LargeUtf8 => hash_array(&as_largestring_array(array), random_state, hashes_buffer, rehash),
381+
DataType::Binary => hash_array(&as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
382+
DataType::BinaryView => hash_array(&as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
383+
DataType::LargeBinary => hash_array(&as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
384+
DataType::FixedSizeBinary(_) => {
385+
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
386+
hash_array(&array, random_state, hashes_buffer, rehash)
387+
}
388+
DataType::Dictionary(_, _) => downcast_dictionary_array! {
389+
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
390+
_ => unreachable!()
391+
}
392+
DataType::Struct(_) => {
393+
let array = as_struct_array(array)?;
394+
hash_struct_array(array, random_state, hashes_buffer)?;
395+
}
396+
DataType::List(_) => {
397+
let array = as_list_array(array)?;
398+
hash_list_array(array, random_state, hashes_buffer)?;
399+
}
400+
DataType::LargeList(_) => {
401+
let array = as_large_list_array(array)?;
402+
hash_list_array(array, random_state, hashes_buffer)?;
403+
}
404+
DataType::Map(_, _) => {
405+
let array = as_map_array(array)?;
406+
hash_map_array(array, random_state, hashes_buffer)?;
407+
}
408+
DataType::FixedSizeList(_,_) => {
409+
let array = as_fixed_size_list_array(array)?;
410+
hash_fixed_list_array(array, random_state, hashes_buffer)?;
411+
}
412+
_ => {
413+
// This is internal because we should have caught this before.
414+
return _internal_err!(
415+
"Unsupported data type in hasher: {}",
416+
array.data_type()
417+
);
418+
}
419+
}
420+
Ok(())
421+
}
422+
423+
/// Test version of `hash_single_array` that forces all hashes to collide to zero.
373424
#[cfg(feature = "force_hash_collisions")]
374-
pub fn create_hashes<'a>(
375-
_arrays: &[ArrayRef],
425+
fn hash_single_array(
426+
_array: &dyn Array,
376427
_random_state: &RandomState,
377-
hashes_buffer: &'a mut Vec<u64>,
378-
) -> Result<&'a mut Vec<u64>> {
428+
hashes_buffer: &mut [u64],
429+
_rehash: bool,
430+
) -> Result<()> {
379431
for hash in hashes_buffer.iter_mut() {
380432
*hash = 0
381433
}
382-
Ok(hashes_buffer)
434+
Ok(())
435+
}
436+
437+
/// Something that can be returned as a `&dyn Array`.
438+
///
439+
/// We want `create_hashes` to accept either `&dyn Array` or `ArrayRef`,
440+
/// and this seems the best way to do so.
441+
///
442+
/// We tried having it accept `AsRef<dyn Array>`
443+
/// but that is not implemented for and cannot be implemented for
444+
/// `&dyn Array` so callers that have the latter would not be able
445+
/// to call `create_hashes` directly. This shim trait makes it possible.
446+
pub trait AsDynArray {
447+
fn as_dyn_array(&self) -> &dyn Array;
448+
}
449+
450+
impl AsDynArray for dyn Array {
451+
fn as_dyn_array(&self) -> &dyn Array {
452+
self
453+
}
454+
}
455+
456+
impl AsDynArray for &dyn Array {
457+
fn as_dyn_array(&self) -> &dyn Array {
458+
*self
459+
}
460+
}
461+
462+
impl AsDynArray for ArrayRef {
463+
fn as_dyn_array(&self) -> &dyn Array {
464+
self.as_ref()
465+
}
383466
}
384467

385-
/// Creates hash values for every row, based on the values in the
386-
/// columns.
468+
impl AsDynArray for &ArrayRef {
469+
fn as_dyn_array(&self) -> &dyn Array {
470+
self.as_ref()
471+
}
472+
}
473+
474+
/// Creates hash values for every row, based on the values in the columns.
387475
///
388476
/// The number of rows to hash is determined by `hashes_buffer.len()`.
389-
/// `hashes_buffer` should be pre-sized appropriately
390-
#[cfg(not(feature = "force_hash_collisions"))]
391-
pub fn create_hashes<'a>(
392-
arrays: &[ArrayRef],
477+
/// `hashes_buffer` should be pre-sized appropriately.
478+
pub fn create_hashes<'a, I, T>(
479+
arrays: I,
393480
random_state: &RandomState,
394481
hashes_buffer: &'a mut Vec<u64>,
395-
) -> Result<&'a mut Vec<u64>> {
396-
for (i, col) in arrays.iter().enumerate() {
397-
let array = col.as_ref();
482+
) -> Result<&'a mut Vec<u64>>
483+
where
484+
I: IntoIterator<Item = T>,
485+
T: AsDynArray,
486+
{
487+
for (i, array) in arrays.into_iter().enumerate() {
398488
// combine hashes with `combine_hashes` for all columns besides the first
399489
let rehash = i >= 1;
400-
downcast_primitive_array! {
401-
array => hash_array_primitive(array, random_state, hashes_buffer, rehash),
402-
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
403-
DataType::Boolean => hash_array(&as_boolean_array(array)?, random_state, hashes_buffer, rehash),
404-
DataType::Utf8 => hash_array(&as_string_array(array)?, random_state, hashes_buffer, rehash),
405-
DataType::Utf8View => hash_array(&as_string_view_array(array)?, random_state, hashes_buffer, rehash),
406-
DataType::LargeUtf8 => hash_array(&as_largestring_array(array), random_state, hashes_buffer, rehash),
407-
DataType::Binary => hash_array(&as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
408-
DataType::BinaryView => hash_array(&as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
409-
DataType::LargeBinary => hash_array(&as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
410-
DataType::FixedSizeBinary(_) => {
411-
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
412-
hash_array(&array, random_state, hashes_buffer, rehash)
413-
}
414-
DataType::Dictionary(_, _) => downcast_dictionary_array! {
415-
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
416-
_ => unreachable!()
417-
}
418-
DataType::Struct(_) => {
419-
let array = as_struct_array(array)?;
420-
hash_struct_array(array, random_state, hashes_buffer)?;
421-
}
422-
DataType::List(_) => {
423-
let array = as_list_array(array)?;
424-
hash_list_array(array, random_state, hashes_buffer)?;
425-
}
426-
DataType::LargeList(_) => {
427-
let array = as_large_list_array(array)?;
428-
hash_list_array(array, random_state, hashes_buffer)?;
429-
}
430-
DataType::Map(_, _) => {
431-
let array = as_map_array(array)?;
432-
hash_map_array(array, random_state, hashes_buffer)?;
433-
}
434-
DataType::FixedSizeList(_,_) => {
435-
let array = as_fixed_size_list_array(array)?;
436-
hash_fixed_list_array(array, random_state, hashes_buffer)?;
437-
}
438-
_ => {
439-
// This is internal because we should have caught this before.
440-
return _internal_err!(
441-
"Unsupported data type in hasher: {}",
442-
col.data_type()
443-
);
444-
}
445-
}
490+
hash_single_array(array.as_dyn_array(), random_state, hashes_buffer, rehash)?;
446491
}
447492
Ok(hashes_buffer)
448493
}
@@ -465,7 +510,7 @@ mod tests {
465510
.collect::<Decimal128Array>()
466511
.with_precision_and_scale(20, 3)
467512
.unwrap();
468-
let array_ref = Arc::new(array);
513+
let array_ref: ArrayRef = Arc::new(array);
469514
let random_state = RandomState::with_seeds(0, 0, 0, 0);
470515
let hashes_buff = &mut vec![0; array_ref.len()];
471516
let hashes = create_hashes(&[array_ref], &random_state, hashes_buff)?;
@@ -478,15 +523,21 @@ mod tests {
478523
let empty_array = FixedSizeListBuilder::new(StringBuilder::new(), 1).finish();
479524
let random_state = RandomState::with_seeds(0, 0, 0, 0);
480525
let hashes_buff = &mut vec![0; 0];
481-
let hashes = create_hashes(&[Arc::new(empty_array)], &random_state, hashes_buff)?;
526+
let hashes = create_hashes(
527+
&[Arc::new(empty_array) as ArrayRef],
528+
&random_state,
529+
hashes_buff,
530+
)?;
482531
assert_eq!(hashes, &Vec::<u64>::new());
483532
Ok(())
484533
}
485534

486535
#[test]
487536
fn create_hashes_for_float_arrays() -> Result<()> {
488-
let f32_arr = Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 444.7]));
489-
let f64_arr = Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 444.7]));
537+
let f32_arr: ArrayRef =
538+
Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 444.7]));
539+
let f64_arr: ArrayRef =
540+
Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 444.7]));
490541

491542
let random_state = RandomState::with_seeds(0, 0, 0, 0);
492543
let hashes_buff = &mut vec![0; f32_arr.len()];
@@ -514,8 +565,10 @@ mod tests {
514565
Some(b"Longer than 12 bytes string"),
515566
];
516567

517-
let binary_array = Arc::new(binary.iter().cloned().collect::<$ARRAY>());
518-
let ref_array = Arc::new(binary.iter().cloned().collect::<BinaryArray>());
568+
let binary_array: ArrayRef =
569+
Arc::new(binary.iter().cloned().collect::<$ARRAY>());
570+
let ref_array: ArrayRef =
571+
Arc::new(binary.iter().cloned().collect::<BinaryArray>());
519572

520573
let random_state = RandomState::with_seeds(0, 0, 0, 0);
521574

@@ -553,7 +606,7 @@ mod tests {
553606
#[test]
554607
fn create_hashes_fixed_size_binary() -> Result<()> {
555608
let input_arg = vec![vec![1, 2], vec![5, 6], vec![5, 6]];
556-
let fixed_size_binary_array =
609+
let fixed_size_binary_array: ArrayRef =
557610
Arc::new(FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap());
558611

559612
let random_state = RandomState::with_seeds(0, 0, 0, 0);
@@ -580,8 +633,9 @@ mod tests {
580633
Some("Longer than 12 bytes string"),
581634
];
582635

583-
let string_array = Arc::new(strings.iter().cloned().collect::<$ARRAY>());
584-
let dict_array = Arc::new(
636+
let string_array: ArrayRef =
637+
Arc::new(strings.iter().cloned().collect::<$ARRAY>());
638+
let dict_array: ArrayRef = Arc::new(
585639
strings
586640
.iter()
587641
.cloned()
@@ -629,8 +683,9 @@ mod tests {
629683
fn create_hashes_for_dict_arrays() {
630684
let strings = [Some("foo"), None, Some("bar"), Some("foo"), None];
631685

632-
let string_array = Arc::new(strings.iter().cloned().collect::<StringArray>());
633-
let dict_array = Arc::new(
686+
let string_array: ArrayRef =
687+
Arc::new(strings.iter().cloned().collect::<StringArray>());
688+
let dict_array: ArrayRef = Arc::new(
634689
strings
635690
.iter()
636691
.cloned()
@@ -865,8 +920,9 @@ mod tests {
865920
let strings1 = [Some("foo"), None, Some("bar")];
866921
let strings2 = [Some("blarg"), Some("blah"), None];
867922

868-
let string_array = Arc::new(strings1.iter().cloned().collect::<StringArray>());
869-
let dict_array = Arc::new(
923+
let string_array: ArrayRef =
924+
Arc::new(strings1.iter().cloned().collect::<StringArray>());
925+
let dict_array: ArrayRef = Arc::new(
870926
strings2
871927
.iter()
872928
.cloned()
@@ -896,4 +952,52 @@ mod tests {
896952

897953
assert_ne!(one_col_hashes, two_col_hashes);
898954
}
955+
956+
#[test]
957+
fn test_create_hashes_from_arrays() {
958+
let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
959+
let float_array: ArrayRef =
960+
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
961+
962+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
963+
let hashes_buff = &mut vec![0; int_array.len()];
964+
let hashes =
965+
create_hashes(&[int_array, float_array], &random_state, hashes_buff).unwrap();
966+
assert_eq!(hashes.len(), 4,);
967+
}
968+
969+
#[test]
970+
fn test_create_hashes_from_dyn_arrays() {
971+
let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
972+
let float_array: ArrayRef =
973+
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
974+
975+
// Verify that we can call create_hashes with only &dyn Array
976+
fn test(arr1: &dyn Array, arr2: &dyn Array) {
977+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
978+
let hashes_buff = &mut vec![0; arr1.len()];
979+
let hashes = create_hashes([arr1, arr2], &random_state, hashes_buff).unwrap();
980+
assert_eq!(hashes.len(), 4,);
981+
}
982+
test(&*int_array, &*float_array);
983+
}
984+
985+
#[test]
986+
fn test_create_hashes_equivalence() {
987+
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
988+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
989+
990+
let mut hashes1 = vec![0; array.len()];
991+
create_hashes(
992+
&[Arc::clone(&array) as ArrayRef],
993+
&random_state,
994+
&mut hashes1,
995+
)
996+
.unwrap();
997+
998+
let mut hashes2 = vec![0; array.len()];
999+
create_hashes([array], &random_state, &mut hashes2).unwrap();
1000+
1001+
assert_eq!(hashes1, hashes2);
1002+
}
8991003
}

0 commit comments

Comments
 (0)