Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 45 additions & 19 deletions vortex-duckdb/src/exporter/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ use vortex::encodings::dict::DictArray;
use vortex::error::VortexResult;
use vortex::{Array, ToCanonical};

use crate::duckdb::{SelectionVector, Vector};
use crate::duckdb::{LogicalType, SelectionVector, Vector};
use crate::exporter::cache::ConversionCache;
use crate::exporter::{ColumnExporter, constant, new_array_exporter};

struct DictExporter<I: NativePType> {
// Store the dictionary values once and export the same dictionary with each codes chunk.
values_vector: Arc<Mutex<Vector>>, // NOTE(ngates): not actually flat...
logical_type: LogicalType,
values_len: u32,
codes: PrimitiveArray,
codes_type: PhantomData<I>,
Expand All @@ -33,6 +34,7 @@ pub(crate) fn new_exporter(
) -> VortexResult<Box<dyn ColumnExporter>> {
// Grab the cache dictionary values.
let values = array.values();
let logical_type: LogicalType = values.dtype().try_into()?;
if let Some(constant) = values.as_opt::<ConstantVTable>() {
return constant::new_exporter_with_mask(
&ConstantArray::new(constant.scalar().clone(), array.codes().len()),
Expand All @@ -53,9 +55,19 @@ pub(crate) fn new_exporter(
Some(vector) => vector,
None => {
// Create a new DuckDB vector for the values.
let mut vector = Vector::with_capacity(values.dtype().try_into()?, values.len());
let mut vector = Vector::with_capacity(logical_type.clone(), values.len());
new_array_exporter(values, cache)?.export(0, values.len(), &mut vector)?;

// This is a bit of a hack, but we need to return the values vector into a dictionary
// typed vector, where we can later set different selection vectors.
// If this is not done here the threads will race to convert the value into a dictionary.
Vector::with_capacity(logical_type.clone(), 0).dictionary(
&vector,
values.len(),
&SelectionVector::with_capacity(0),
0,
);

let vector = Arc::new(Mutex::new(vector));
cache
.values_cache
Expand All @@ -65,10 +77,25 @@ pub(crate) fn new_exporter(
}
};

// let new_values_vector = {
// let values_vector = values_vector.lock();
// let mut new_values_vector = Vector::new(logical_type.clone());
// // Shares the underlying data which determines the vectors length.
// new_values_vector.reference(&values_vector);
// Vector::with_capacity(values_vector.logical_type(), 0).dictionary(
// &new_values_vector,
// values.len(),
// &SelectionVector::with_capacity(0),
// 0,
// );
// Arc::new(Mutex::new(new_values_vector))
// };

let codes = array.codes().to_primitive();
match_each_integer_ptype!(codes.ptype(), |I| {
Ok(Box::new(DictExporter {
values_vector,
logical_type,
values_len: values.len().as_u32(),
codes,
codes_type: PhantomData::<I>,
Expand All @@ -84,30 +111,29 @@ impl<I: NativePType + AsPrimitive<u32>> ColumnExporter for DictExporter<I> {
let mut sel_vec = SelectionVector::with_capacity(len);
let mut_sel_vec = unsafe { sel_vec.as_slice_mut(len) };
for (dst, src) in mut_sel_vec.iter_mut().zip(
// FIXME(joe): we ignore nullability in codes, fix with a specific null value in the values vector.
self.codes.as_slice::<I>()[offset..offset + len]
.iter()
.map(|v| v.as_()),
) {
*dst = src
}

// DuckDB requires the value vector which references the data to be
// unique. Otherwise, DuckDB races on the values vector passed to the
// dictionary.
let new_values_vector = {
let values_vector = self.values_vector.lock();
let mut new_values_vector = Vector::new(values_vector.logical_type());
// Shares the underlying data which determines the vectors length.
new_values_vector.reference(&values_vector);
new_values_vector
};

vector.dictionary(&new_values_vector, self.values_len as usize, &sel_vec, len);

// Use a unique id for each dictionary data array -- telling duckdb that
// the dict value vector is the same as reuse the hash in a join.
vector.set_dictionary_id(format!("{}-{}", self.cache_id, self.value_id));
vector.set_dictionary_len(self.values_len);
{
let values = self.values_vector.lock();
let mut other = Vector::new(self.logical_type.clone());
other.reference(&*values);
vector.dictionary(
&other,
self.values_len as usize,
&sel_vec,
len,
);

// Use a unique id used for each dictionary data array -- informing duckdb that the dict value
// vector is the same, used for reuse of the hash in a join.
vector.set_dictionary_id(format!("{}-{}", self.cache_id, self.value_id));
}

Ok(())
}
Expand Down
Loading