From cf78119e2463ab65e916f948f0eaebe7cef85eb3 Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Wed, 8 Oct 2025 19:08:40 +0200 Subject: [PATCH 01/11] #17972 Restore case expr/expr optimisation while ensuring lazy evaluation --- datafusion/physical-expr/src/expressions/case.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index 5409cfe8e7e4..b9cf6ff014b7 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -155,10 +155,7 @@ impl CaseExpr { && else_expr.as_ref().unwrap().as_any().is::() { EvalMethod::ScalarOrScalar - } else if when_then_expr.len() == 1 - && is_cheap_and_infallible(&(when_then_expr[0].1)) - && else_expr.as_ref().is_some_and(is_cheap_and_infallible) - { + } else if when_then_expr.len() == 1 && else_expr.is_some() { EvalMethod::ExpressionOrExpression } else { EvalMethod::NoExpression @@ -431,6 +428,15 @@ impl CaseExpr { _ => Cow::Owned(prep_null_mask_filter(when_value)), }; + let true_count = when_value.true_count(); + if true_count == batch.num_rows() { + // Avoid evaluate_selection when all rows are true + return self.when_then_expr[0].1.evaluate(batch); + } else if true_count == 0 { + // Avoid evaluate_selection when all rows are false/null + return self.else_expr.as_ref().unwrap().evaluate(batch); + } + let then_value = self.when_then_expr[0] .1 .evaluate_selection(batch, &when_value)? From b7826280ca2b0a8891cc2dfda7270b6c57a39358 Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Thu, 9 Oct 2025 23:01:20 +0200 Subject: [PATCH 02/11] Avoid calling `PhysicalExpr::evaluate` from `PhysicalExpr::evaluate_selection` for empty selections. --- .../physical-expr-common/src/physical_expr.rs | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 6f7c432c7582..f199dfc86fc0 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use crate::utils::scatter; -use arrow::array::{ArrayRef, BooleanArray}; +use arrow::array::{make_builder, ArrayBuilder, ArrayRef, BooleanArray}; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Field, FieldRef, Schema}; use arrow::record_batch::RecordBatch; @@ -97,11 +97,21 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { batch: &RecordBatch, selection: &BooleanArray, ) -> Result { - let tmp_batch = filter_record_batch(batch, selection)?; - - let tmp_result = self.evaluate(&tmp_batch)?; + let row_count = selection.true_count(); + + let tmp_result = if row_count == 0 { + // Do not call `evaluate` when the selection is empty. + // Otherwise, conditionally executing expressions like `case` may end up + // unintentionally evaluating fallible expressions like `1/0` which will trigger + // unexpected runtime errors. + let datatype = self.data_type(batch.schema_ref().as_ref())?; + ColumnarValue::Array(make_builder(&datatype, 0).finish()) + } else { + let tmp_batch = filter_record_batch(batch, selection)?; + self.evaluate(&tmp_batch)? + }; - if batch.num_rows() == tmp_batch.num_rows() { + if batch.num_rows() == row_count { // All values from the `selection` filter are true. Ok(tmp_result) } else if let ColumnarValue::Array(a) = tmp_result { From eed9a34d09961bfb6d272b4beb1153e0bf485381 Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Fri, 10 Oct 2025 05:54:03 +0200 Subject: [PATCH 03/11] Make `PhysicalExpr::evaluate_selection` correctly handle empty input sets and all false filters --- .../physical-expr-common/src/physical_expr.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index f199dfc86fc0..42d1d912cccd 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -97,13 +97,18 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { batch: &RecordBatch, selection: &BooleanArray, ) -> Result { - let row_count = selection.true_count(); + let selection_count = selection.true_count(); - let tmp_result = if row_count == 0 { + let tmp_result = if batch.num_rows() == 0 || selection_count == batch.num_rows() { + // When evaluating the empty set we should simply delegate to `evaluate` + // Additionally, if the selection vector is entirely true, we can skip filtering + self.evaluate(batch)? + } else if selection_count == 0 { // Do not call `evaluate` when the selection is empty. - // Otherwise, conditionally executing expressions like `case` may end up - // unintentionally evaluating fallible expressions like `1/0` which will trigger - // unexpected runtime errors. + // We're already sure we're not evaluating over the empty set due to the previous check. + // Reducing a non-empty set to the empty set due to the selection vector and then + // calling `evaluate` with the empty set would yield a different answer when evaluating + // expressions without column references. let datatype = self.data_type(batch.schema_ref().as_ref())?; ColumnarValue::Array(make_builder(&datatype, 0).finish()) } else { @@ -111,7 +116,7 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { self.evaluate(&tmp_batch)? }; - if batch.num_rows() == row_count { + if batch.num_rows() == selection_count { // All values from the `selection` filter are true. Ok(tmp_result) } else if let ColumnarValue::Array(a) = tmp_result { From c8524d3565566b310ec0e6563ae236aea7c4687b Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Fri, 10 Oct 2025 06:03:28 +0200 Subject: [PATCH 04/11] Reoragnize code to avoid scatter codepath when using `evaluate` fast path. --- .../physical-expr-common/src/physical_expr.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 42d1d912cccd..d86a05df1d06 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -99,11 +99,13 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { ) -> Result { let selection_count = selection.true_count(); - let tmp_result = if batch.num_rows() == 0 || selection_count == batch.num_rows() { + if batch.num_rows() == 0 || selection_count == batch.num_rows() { // When evaluating the empty set we should simply delegate to `evaluate` // Additionally, if the selection vector is entirely true, we can skip filtering - self.evaluate(batch)? - } else if selection_count == 0 { + return self.evaluate(batch); + } + + let tmp_result = if selection_count == 0 { // Do not call `evaluate` when the selection is empty. // We're already sure we're not evaluating over the empty set due to the previous check. // Reducing a non-empty set to the empty set due to the selection vector and then @@ -116,10 +118,7 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { self.evaluate(&tmp_batch)? }; - if batch.num_rows() == selection_count { - // All values from the `selection` filter are true. - Ok(tmp_result) - } else if let ColumnarValue::Array(a) = tmp_result { + if let ColumnarValue::Array(a) = tmp_result { scatter(selection, a.as_ref()).map(ColumnarValue::Array) } else if let ColumnarValue::Scalar(ScalarValue::Boolean(value)) = &tmp_result { // When the scalar is true or false, skip the scatter process From f9f67c5f899435e9d742dc5b3bd025f770ff1097 Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Fri, 10 Oct 2025 06:13:15 +0200 Subject: [PATCH 05/11] Clarify comments in case --- datafusion/physical-expr/src/expressions/case.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index b9cf6ff014b7..e6762663b83a 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -428,12 +428,13 @@ impl CaseExpr { _ => Cow::Owned(prep_null_mask_filter(when_value)), }; + // For the true and false/null selection vectors, bypass `evaluate_selection` and merging + // results. This avoids materializing the array for the other branch which we will discard + // entirely anyway. let true_count = when_value.true_count(); if true_count == batch.num_rows() { - // Avoid evaluate_selection when all rows are true return self.when_then_expr[0].1.evaluate(batch); } else if true_count == 0 { - // Avoid evaluate_selection when all rows are false/null return self.else_expr.as_ref().unwrap().evaluate(batch); } From b849738e73d53aff4b1ef07f6eb0cd75e2478858 Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Fri, 10 Oct 2025 06:16:04 +0200 Subject: [PATCH 06/11] Move null handling after true count check. --- datafusion/physical-expr/src/expressions/case.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index e6762663b83a..d14146a20d8b 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -422,12 +422,6 @@ impl CaseExpr { ) })?; - // Treat 'NULL' as false value - let when_value = match when_value.null_count() { - 0 => Cow::Borrowed(when_value), - _ => Cow::Owned(prep_null_mask_filter(when_value)), - }; - // For the true and false/null selection vectors, bypass `evaluate_selection` and merging // results. This avoids materializing the array for the other branch which we will discard // entirely anyway. @@ -438,6 +432,12 @@ impl CaseExpr { return self.else_expr.as_ref().unwrap().evaluate(batch); } + // Treat 'NULL' as false value + let when_value = match when_value.null_count() { + 0 => Cow::Borrowed(when_value), + _ => Cow::Owned(prep_null_mask_filter(when_value)), + }; + let then_value = self.when_then_expr[0] .1 .evaluate_selection(batch, &when_value)? From efbd205ebe8baa042618c887a8759a0c8e482eb1 Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Fri, 10 Oct 2025 06:50:39 +0200 Subject: [PATCH 07/11] Tweaking comments --- .../physical-expr-common/src/physical_expr.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index d86a05df1d06..702b794b2c81 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -100,22 +100,20 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { let selection_count = selection.true_count(); if batch.num_rows() == 0 || selection_count == batch.num_rows() { - // When evaluating the empty set we should simply delegate to `evaluate` - // Additionally, if the selection vector is entirely true, we can skip filtering + // Skip filtering logic if possible return self.evaluate(batch); } let tmp_result = if selection_count == 0 { // Do not call `evaluate` when the selection is empty. - // We're already sure we're not evaluating over the empty set due to the previous check. - // Reducing a non-empty set to the empty set due to the selection vector and then - // calling `evaluate` with the empty set would yield a different answer when evaluating - // expressions without column references. + // When `evaluate_selection` is being used for conditional, lazy evaluation, + // evaluating an expression for a false selection vector may end up unintentionally + // evaluating a fallible expression. let datatype = self.data_type(batch.schema_ref().as_ref())?; ColumnarValue::Array(make_builder(&datatype, 0).finish()) } else { - let tmp_batch = filter_record_batch(batch, selection)?; - self.evaluate(&tmp_batch)? + let filtered_batch = filter_record_batch(batch, selection)?; + self.evaluate(&filtered_batch)? }; if let ColumnarValue::Array(a) = tmp_result { From 480a747091d5b2b265868d3fb42cc6fdbdefcba2 Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Fri, 10 Oct 2025 13:18:42 +0200 Subject: [PATCH 08/11] Add unit tests to help define the boundary case behaviour of evaluate_selection --- .../physical-expr-common/src/physical_expr.rs | 171 ++++++++++++++++++ 1 file changed, 171 insertions(+) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 702b794b2c81..32b4ce7ccce1 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -613,3 +613,174 @@ pub fn is_volatile(expr: &Arc) -> bool { .expect("infallible closure should not fail"); is_volatile } + +#[cfg(test)] +mod test { + use crate::physical_expr::PhysicalExpr; + use arrow::array::{Array, BooleanArray, Int64Array, RecordBatch}; + use arrow::datatypes::{DataType, Schema}; + use datafusion_expr_common::columnar_value::ColumnarValue; + use std::fmt::{Display, Formatter}; + use std::sync::Arc; + + #[derive(Debug, PartialEq, Eq, Hash)] + struct TestExpr {} + + impl PhysicalExpr for TestExpr { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> datafusion_common::Result> { + Ok(Arc::new(Self {})) + } + + fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str("TestExpr") + } + + fn data_type(&self, _schema: &Schema) -> datafusion_common::Result { + Ok(DataType::Int64) + } + + fn nullable(&self, _schema: &Schema) -> datafusion_common::Result { + Ok(false) + } + + fn evaluate( + &self, + _batch: &RecordBatch, + ) -> datafusion_common::Result { + let data = vec![1; _batch.num_rows()]; + Ok(ColumnarValue::Array(Arc::new(Int64Array::from(data)))) + } + } + + impl Display for TestExpr { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.fmt_sql(f) + } + } + + macro_rules! assert_arrays_eq { + ($EXPECTED: expr, $ACTUAL: expr, $MESSAGE: expr) => { + let expected = $EXPECTED.to_array(1).unwrap(); + let actual = $ACTUAL; + + let actual_array = actual.to_array(expected.len()).unwrap(); + let actual_ref = actual_array.as_ref(); + let expected_ref = expected.as_ref(); + assert!( + actual_ref == expected_ref, + "{}: expected: {:?}, actual: {:?}", + $MESSAGE, + $EXPECTED, + actual_ref + ); + }; + } + + fn test_evaluate_selection( + batch: &RecordBatch, + selection: &BooleanArray, + expected: &ColumnarValue, + ) { + let expr = TestExpr {}; + + // First check that the `evaluate_selection` is the expected one + let selection_result = expr.evaluate_selection(&batch, selection).unwrap(); + assert_eq!(expected.to_array(1).unwrap().len(), selection_result.to_array(1).unwrap().len(), "evaluate_selection should output row count should match input record batch"); + assert_arrays_eq!(expected, &selection_result, "evaluate_selection returned unexpected value"); + + // If we're selecting all rows, the result should be the same as calling `evaluate` + // with the full record batch. + if (0..batch.num_rows()).all(|row_idx| row_idx < selection.len() && selection.value(row_idx)) { + let empty_result = expr.evaluate(&batch).unwrap(); + + assert_arrays_eq!(empty_result, &selection_result, "evaluate_selection does not match unfiltered evaluate result"); + } + } + + #[test] + pub fn test_evaluate_selection_with_empty_record_batch() { + test_evaluate_selection( + &RecordBatch::new_empty(Arc::new(Schema::empty())), + &BooleanArray::from(vec![false; 0]), + &ColumnarValue::Array(Arc::new(Int64Array::new_null(0))), + ); + } + + #[test] + pub fn test_evaluate_selection_with_empty_record_batch_with_larger_false_selection() { + test_evaluate_selection( + &RecordBatch::new_empty(Arc::new(Schema::empty())), + &BooleanArray::from(vec![false; 10]), + &ColumnarValue::Array(Arc::new(Int64Array::new_null(0))), + ); + } + + #[test] + pub fn test_evaluate_selection_with_empty_record_batch_with_larger_true_selection() { + test_evaluate_selection( + &RecordBatch::new_empty(Arc::new(Schema::empty())), + &BooleanArray::from(vec![true; 10]), + &ColumnarValue::Array(Arc::new(Int64Array::new_null(0))), + ); + } + + #[test] + pub fn test_evaluate_selection_with_non_empty_record_batch() { + test_evaluate_selection( + unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }, + &BooleanArray::from(vec![true; 10]), + &ColumnarValue::Array(Arc::new(Int64Array::from(vec![1; 10]))), + ); + } + + #[test] + pub fn test_evaluate_selection_with_non_empty_record_batch_with_larger_false_selection( + ) { + test_evaluate_selection( + unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }, + &BooleanArray::from(vec![false; 20]), + &ColumnarValue::Array(Arc::new(Int64Array::from(vec![None; 10]))), + ); + } + + #[test] + pub fn test_evaluate_selection_with_non_empty_record_batch_with_larger_true_selection( + ) { + test_evaluate_selection( + unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }, + &BooleanArray::from(vec![true; 20]), + &ColumnarValue::Array(Arc::new(Int64Array::from(vec![1; 10]))), + ); + } + + #[test] + pub fn test_evaluate_selection_with_non_empty_record_batch_with_smaller_false_selection( + ) { + test_evaluate_selection( + unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }, + &BooleanArray::from(vec![false; 5]), + &ColumnarValue::Array(Arc::new(Int64Array::from(vec![None; 10]))), + ); + } + + #[test] + pub fn test_evaluate_selection_with_non_empty_record_batch_with_smaller_true_selection( + ) { + test_evaluate_selection( + unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }, + &BooleanArray::from(vec![true; 5]), + &ColumnarValue::Array(Arc::new(Int64Array::from(vec![1; 10]))), + ); + } +} From c8186ecb41d2a9554779d01eb459c4f2bdb46764 Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Fri, 10 Oct 2025 14:49:21 +0200 Subject: [PATCH 09/11] Code polishing - Add extra comments - Use match for the scatter paragraph - Validate that the size of selection and batch match --- .../physical-expr-common/src/physical_expr.rs | 153 +++++++++++------- 1 file changed, 95 insertions(+), 58 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 32b4ce7ccce1..f77eac2f8b04 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -23,14 +23,14 @@ use std::sync::Arc; use crate::utils::scatter; -use arrow::array::{make_builder, ArrayBuilder, ArrayRef, BooleanArray}; +use arrow::array::{new_empty_array, ArrayRef, BooleanArray}; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Field, FieldRef, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; -use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue}; +use datafusion_common::{exec_err, internal_err, not_impl_err, Result, ScalarValue}; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_expr_common::interval_arithmetic::Interval; use datafusion_expr_common::sort_properties::ExprProperties; @@ -90,48 +90,69 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { self.nullable(input_schema)?, ))) } - /// Evaluate an expression against a RecordBatch after first applying a - /// validity array + /// Evaluate an expression against a RecordBatch after first applying a validity array + /// + /// # Errors + /// + /// Returns an `Err` if the expression could not be evaluated or if the length of the + /// `selection` validity array and the number of row in `batch` is not equal. fn evaluate_selection( &self, batch: &RecordBatch, selection: &BooleanArray, ) -> Result { + let row_count = batch.num_rows(); + if row_count != selection.len() { + return exec_err!("Selection array length does not match batch row count: {} != {row_count}", selection.len()); + } + let selection_count = selection.true_count(); - if batch.num_rows() == 0 || selection_count == batch.num_rows() { - // Skip filtering logic if possible + // First, check if we can avoid filtering altogether. + if selection_count == row_count { + // All values from the `selection` filter are true and match the input batch. + // No need to perform any filtering. return self.evaluate(batch); } - let tmp_result = if selection_count == 0 { + // Next, prepare the result array for each 'true' row in the selection vector. + let filtered_result = if selection_count == 0 { // Do not call `evaluate` when the selection is empty. - // When `evaluate_selection` is being used for conditional, lazy evaluation, - // evaluating an expression for a false selection vector may end up unintentionally - // evaluating a fallible expression. + // `evaluate_selection` is used to conditionally evaluate expressions. + // When the expression in question is fallible, evaluating it with an empty + // record batch may trigger a runtime error (e.g. division by zero). + // + // Instead, create an empty array matching the expected return type. let datatype = self.data_type(batch.schema_ref().as_ref())?; - ColumnarValue::Array(make_builder(&datatype, 0).finish()) + ColumnarValue::Array(new_empty_array(&datatype)) } else { + // If we reach this point, there's no other option than to filter the batch. + // This is a fairly costly operation since it requires creating partial copies + // (worst case of length `row_count - 1`) of all the arrays in the record batch. + // The resulting `filtered_batch` will contain `selection_count` rows. let filtered_batch = filter_record_batch(batch, selection)?; self.evaluate(&filtered_batch)? }; - if let ColumnarValue::Array(a) = tmp_result { - scatter(selection, a.as_ref()).map(ColumnarValue::Array) - } else if let ColumnarValue::Scalar(ScalarValue::Boolean(value)) = &tmp_result { - // When the scalar is true or false, skip the scatter process - if let Some(v) = value { - if *v { - Ok(ColumnarValue::from(Arc::new(selection.clone()) as ArrayRef)) + // Finally, scatter the filtered result array so that the indices match the input rows again. + match &filtered_result { + ColumnarValue::Array(a) => { + scatter(selection, a.as_ref()).map(ColumnarValue::Array) + } + ColumnarValue::Scalar(ScalarValue::Boolean(value)) => { + // When the scalar is true or false, skip the scatter process + if let Some(v) = value { + if *v { + Ok(ColumnarValue::from(Arc::new(selection.clone()) as ArrayRef)) + } else { + Ok(filtered_result) + } } else { - Ok(tmp_result) + let array = BooleanArray::from(vec![None; row_count]); + scatter(selection, &array).map(ColumnarValue::Array) } - } else { - let array = BooleanArray::from(vec![None; batch.num_rows()]); - scatter(selection, &array).map(ColumnarValue::Array) } - } else { - Ok(tmp_result) + ColumnarValue::Scalar(_) => Ok(filtered_result), } } @@ -631,21 +652,6 @@ mod test { self } - fn children(&self) -> Vec<&Arc> { - vec![] - } - - fn with_new_children( - self: Arc, - _children: Vec>, - ) -> datafusion_common::Result> { - Ok(Arc::new(Self {})) - } - - fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str("TestExpr") - } - fn data_type(&self, _schema: &Schema) -> datafusion_common::Result { Ok(DataType::Int64) } @@ -661,6 +667,21 @@ mod test { let data = vec![1; _batch.num_rows()]; Ok(ColumnarValue::Array(Arc::new(Int64Array::from(data)))) } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> datafusion_common::Result> { + Ok(Arc::new(Self {})) + } + + fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str("TestExpr") + } } impl Display for TestExpr { @@ -674,9 +695,9 @@ mod test { let expected = $EXPECTED.to_array(1).unwrap(); let actual = $ACTUAL; - let actual_array = actual.to_array(expected.len()).unwrap(); - let actual_ref = actual_array.as_ref(); - let expected_ref = expected.as_ref(); + let actual_array = actual.to_array(expected.len()).unwrap(); + let actual_ref = actual_array.as_ref(); + let expected_ref = expected.as_ref(); assert!( actual_ref == expected_ref, "{}: expected: {:?}, actual: {:?}", @@ -696,18 +717,40 @@ mod test { // First check that the `evaluate_selection` is the expected one let selection_result = expr.evaluate_selection(&batch, selection).unwrap(); - assert_eq!(expected.to_array(1).unwrap().len(), selection_result.to_array(1).unwrap().len(), "evaluate_selection should output row count should match input record batch"); - assert_arrays_eq!(expected, &selection_result, "evaluate_selection returned unexpected value"); + assert_eq!( + expected.to_array(1).unwrap().len(), + selection_result.to_array(1).unwrap().len(), + "evaluate_selection should output row count should match input record batch" + ); + assert_arrays_eq!( + expected, + &selection_result, + "evaluate_selection returned unexpected value" + ); // If we're selecting all rows, the result should be the same as calling `evaluate` // with the full record batch. - if (0..batch.num_rows()).all(|row_idx| row_idx < selection.len() && selection.value(row_idx)) { + if (0..batch.num_rows()) + .all(|row_idx| row_idx < selection.len() && selection.value(row_idx)) + { let empty_result = expr.evaluate(&batch).unwrap(); - assert_arrays_eq!(empty_result, &selection_result, "evaluate_selection does not match unfiltered evaluate result"); + assert_arrays_eq!( + empty_result, + &selection_result, + "evaluate_selection does not match unfiltered evaluate result" + ); } } + fn test_evaluate_selection_error(batch: &RecordBatch, selection: &BooleanArray) { + let expr = TestExpr {}; + + // First check that the `evaluate_selection` is the expected one + let selection_result = expr.evaluate_selection(&batch, selection); + assert!(selection_result.is_err(), "evaluate_selection should fail"); + } + #[test] pub fn test_evaluate_selection_with_empty_record_batch() { test_evaluate_selection( @@ -719,19 +762,17 @@ mod test { #[test] pub fn test_evaluate_selection_with_empty_record_batch_with_larger_false_selection() { - test_evaluate_selection( + test_evaluate_selection_error( &RecordBatch::new_empty(Arc::new(Schema::empty())), &BooleanArray::from(vec![false; 10]), - &ColumnarValue::Array(Arc::new(Int64Array::new_null(0))), ); } #[test] pub fn test_evaluate_selection_with_empty_record_batch_with_larger_true_selection() { - test_evaluate_selection( + test_evaluate_selection_error( &RecordBatch::new_empty(Arc::new(Schema::empty())), &BooleanArray::from(vec![true; 10]), - &ColumnarValue::Array(Arc::new(Int64Array::new_null(0))), ); } @@ -747,40 +788,36 @@ mod test { #[test] pub fn test_evaluate_selection_with_non_empty_record_batch_with_larger_false_selection( ) { - test_evaluate_selection( + test_evaluate_selection_error( unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }, &BooleanArray::from(vec![false; 20]), - &ColumnarValue::Array(Arc::new(Int64Array::from(vec![None; 10]))), ); } #[test] pub fn test_evaluate_selection_with_non_empty_record_batch_with_larger_true_selection( ) { - test_evaluate_selection( + test_evaluate_selection_error( unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }, &BooleanArray::from(vec![true; 20]), - &ColumnarValue::Array(Arc::new(Int64Array::from(vec![1; 10]))), ); } #[test] pub fn test_evaluate_selection_with_non_empty_record_batch_with_smaller_false_selection( ) { - test_evaluate_selection( + test_evaluate_selection_error( unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }, &BooleanArray::from(vec![false; 5]), - &ColumnarValue::Array(Arc::new(Int64Array::from(vec![None; 10]))), ); } #[test] pub fn test_evaluate_selection_with_non_empty_record_batch_with_smaller_true_selection( ) { - test_evaluate_selection( + test_evaluate_selection_error( unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }, &BooleanArray::from(vec![true; 5]), - &ColumnarValue::Array(Arc::new(Int64Array::from(vec![1; 10]))), ); } } From aff3f18eff19504be1b131f5415f26b9e6fc7e66 Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Fri, 10 Oct 2025 17:36:52 +0200 Subject: [PATCH 10/11] Fix clippy errors --- datafusion/physical-expr-common/src/physical_expr.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index f77eac2f8b04..e5e7d6c00f08 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -662,9 +662,9 @@ mod test { fn evaluate( &self, - _batch: &RecordBatch, + batch: &RecordBatch, ) -> datafusion_common::Result { - let data = vec![1; _batch.num_rows()]; + let data = vec![1; batch.num_rows()]; Ok(ColumnarValue::Array(Arc::new(Int64Array::from(data)))) } @@ -716,7 +716,7 @@ mod test { let expr = TestExpr {}; // First check that the `evaluate_selection` is the expected one - let selection_result = expr.evaluate_selection(&batch, selection).unwrap(); + let selection_result = expr.evaluate_selection(batch, selection).unwrap(); assert_eq!( expected.to_array(1).unwrap().len(), selection_result.to_array(1).unwrap().len(), @@ -733,7 +733,7 @@ mod test { if (0..batch.num_rows()) .all(|row_idx| row_idx < selection.len() && selection.value(row_idx)) { - let empty_result = expr.evaluate(&batch).unwrap(); + let empty_result = expr.evaluate(batch).unwrap(); assert_arrays_eq!( empty_result, @@ -747,7 +747,7 @@ mod test { let expr = TestExpr {}; // First check that the `evaluate_selection` is the expected one - let selection_result = expr.evaluate_selection(&batch, selection); + let selection_result = expr.evaluate_selection(batch, selection); assert!(selection_result.is_err(), "evaluate_selection should fail"); } From 99a32fcde314fbb3f9ca45d95f5aaf933abb9719 Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Fri, 10 Oct 2025 18:41:11 +0200 Subject: [PATCH 11/11] Add additional case SLTs --- datafusion/sqllogictest/test_files/case.slt | 27 +++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/datafusion/sqllogictest/test_files/case.slt b/datafusion/sqllogictest/test_files/case.slt index 69f80f459394..9bc1f83ed119 100644 --- a/datafusion/sqllogictest/test_files/case.slt +++ b/datafusion/sqllogictest/test_files/case.slt @@ -467,6 +467,7 @@ FROM t; ---- [{foo: blarg}] +# mix of then and else query II SELECT v, CASE WHEN v != 0 THEN 10/v ELSE 42 END FROM (VALUES (0), (1), (2)) t(v) ---- @@ -474,12 +475,38 @@ SELECT v, CASE WHEN v != 0 THEN 10/v ELSE 42 END FROM (VALUES (0), (1), (2)) t(v 1 10 2 5 +# when expressions is always false, then branch should never be evaluated query II SELECT v, CASE WHEN v < 0 THEN 10/0 ELSE 1 END FROM (VALUES (1), (2)) t(v) ---- 1 1 2 1 +# when expressions is always true, else branch should never be evaluated +query II +SELECT v, CASE WHEN v > 0 THEN 1 ELSE 10/0 END FROM (VALUES (1), (2)) t(v) +---- +1 1 +2 1 + + +# lazy evaluation of multiple when branches, else branch should never be evaluated +query II +SELECT v, CASE WHEN v == 1 THEN -1 WHEN v == 2 THEN -2 WHEN v == 3 THEN -3 ELSE 10/0 END FROM (VALUES (1), (2), (3)) t(v) +---- +1 -1 +2 -2 +3 -3 + +# covers the InfallibleExprOrNull evaluation strategy +query II +SELECT v, CASE WHEN v THEN 1 END FROM (VALUES (1), (2), (3), (NULL)) t(v) +---- +1 1 +2 1 +3 1 +NULL NULL + statement ok drop table t