Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;

use crate::utils::scatter;

use arrow::array::{ArrayRef, BooleanArray};
use arrow::array::{make_builder, ArrayBuilder, ArrayRef, BooleanArray};
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, Field, FieldRef, Schema};
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -97,14 +97,26 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
batch: &RecordBatch,
selection: &BooleanArray,
) -> Result<ColumnarValue> {
let tmp_batch = filter_record_batch(batch, selection)?;
let selection_count = selection.true_count();

let tmp_result = self.evaluate(&tmp_batch)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of new code which repeats logic of filter_record_batch.
What if we just changed this line only?

let tmp_result =  if tmp_batch.is_empty {
  // Do not call `evaluate` when the selection is empty.
            // When `evaluate_selection` is being used for conditional, lazy evaluation,
            // evaluating an expression for a false selection vector may end up unintentionally
            // evaluating a fallible expression.
            let datatype = self.data_type(batch.schema_ref().as_ref())?;
            ColumnarValue::Array(make_builder(&datatype, 0).finish())
} else {
  self.evaluate(&tmp_batch)?;
}

Note how this does not inspect selection / selection_count directly, leveraging the work done by filter_record_batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing it, but I don't see the overlap with filter_record_batch. AFAICT there are no checks to avoid creating a new record batch. What this code is doing is preparing an empty result value while filter_record_batch has optimised code to an empty record batch if the filter is all-false.

if batch.num_rows() == 0 || selection_count == batch.num_rows() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if batch.num_rows() == 0 || selection_count == batch.num_rows() {
if selection_count == batch.num_rows() {

(i'd assume batch.num_rows() == 0 implies that also selection_count == 0. If it is not the case, we have a more permissive if condition, but requires a code comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no assertion for this in place and as far as I can tell (but I did not verify yet), you can call evaluate_selection today with a mismatch between batch.num_rows() and selection.len() and it will do something. I haven't tested this yet, so I'm not 100% sure what the outcome would be.
I'll write an extra unit test to cover this.

The idea here is to avoid any extra work whatsoever in the trivial cases. Not sure what a useful, non-trivial comment for that would be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb @findepi I've added some unit tests to help define the behaviour of evaluate_selection. Some are currently still failing. Could you guys take a look at the tests to see if what they're asserting is correct?

The issues are all related to record batch / selection vector size mismatches. I don't think the current behaviour makes sense tbh (which is also present on main). I would expect to either get an error or as many output rows as there were input rows.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've taken the liberty of adding the size mismatch check and returning an execution error in case of mismatch. Within the DataFusion library case is the only client of this API and for that this change should be fine. The behaviour in case of mismatch was pretty weird, so I doubt people would be making active use of this. You never know of course. I'll add this to the user visible changes list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a couple of tests guide by code coverage.expr_or_expr is definitely well covered.

// Skip filtering logic if possible
return self.evaluate(batch);
}

if batch.num_rows() == tmp_batch.num_rows() {
// All values from the `selection` filter are true.
Ok(tmp_result)
} else if let ColumnarValue::Array(a) = tmp_result {
let tmp_result = if selection_count == 0 {
// Do not call `evaluate` when the selection is empty.
// When `evaluate_selection` is being used for conditional, lazy evaluation,
// evaluating an expression for a false selection vector may end up unintentionally
// evaluating a fallible expression.
let datatype = self.data_type(batch.schema_ref().as_ref())?;
ColumnarValue::Array(make_builder(&datatype, 0).finish())
} else {
let filtered_batch = filter_record_batch(batch, selection)?;
self.evaluate(&filtered_batch)?
};

if let ColumnarValue::Array(a) = tmp_result {
scatter(selection, a.as_ref()).map(ColumnarValue::Array)
} else if let ColumnarValue::Scalar(ScalarValue::Boolean(value)) = &tmp_result {
// When the scalar is true or false, skip the scatter process
Expand Down
15 changes: 11 additions & 4 deletions datafusion/physical-expr/src/expressions/case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,7 @@ impl CaseExpr {
&& else_expr.as_ref().unwrap().as_any().is::<Literal>()
{
EvalMethod::ScalarOrScalar
} else if when_then_expr.len() == 1
&& is_cheap_and_infallible(&(when_then_expr[0].1))
&& else_expr.as_ref().is_some_and(is_cheap_and_infallible)
{
} else if when_then_expr.len() == 1 && else_expr.is_some() {
EvalMethod::ExpressionOrExpression
} else {
EvalMethod::NoExpression
Expand Down Expand Up @@ -425,6 +422,16 @@ impl CaseExpr {
)
})?;

// For the true and false/null selection vectors, bypass `evaluate_selection` and merging
// results. This avoids materializing the array for the other branch which we will discard
// entirely anyway.
let true_count = when_value.true_count();
if true_count == batch.num_rows() {
return self.when_then_expr[0].1.evaluate(batch);
} else if true_count == 0 {
return self.else_expr.as_ref().unwrap().evaluate(batch);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading of this code is that it will still evaluate the then expression as long as there is at least one true value in when -- if evaluate_selection is a problem, shouldn't we fix evaluate_selection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading of this code is that it will still evaluate the then expression as long as there is at least one true value in when

Yes, that's correct. There's no way to avoid that.

This particular bit of code is both an optimisation and a correctness thing.

From a performance point of view, we already know the selection vector is redundant, so there's really no point in calling evaluate_selection.

For correctness, what's being avoid here is calling either then or else with a selection vector that will result in an empty record batch after filtering. We could add similar checks in evaluate_selection to prevent evaluating the downstream expression for empty record batches as well. Its current contract requires it to return an array with the same length as the unfiltered input batch though. You can't avoid having to create an all-nulls array then.

Copy link
Member

@findepi findepi Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading of this code is that it will still evaluate the then expression as long as there is at least one true value in when

Yes, that's correct. There's no way to avoid that.

isn't this reintroducing the bug that was fixed in #15384, just in a big more complex wrapping?

Copy link
Contributor Author

@pepijnve pepijnve Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this reintroducing the bug that was fixed in #15384, just in a big more complex wrapping?

No, why do you think that's the case? If you write case foo is not null then foo else 1/0 end and foo happens to be NULL, what do you expect to happen?

The original issue was that in the example above for a single row with a non-null foo, the code was evaluating the then branch with [true] as selection vector and the else branch with [false]. The latter was passed to evaluate_selection which then filters the record batch down to an empty record batch and then calls the else expression with that record batch. For an expression like 1/0, you end up getting executing that division anyway even though the result would be discarded.

There are two ways to fix this:

  1. don't evaluate binary expressions and literals for empty batches (as you had suggested in a comment earlier I believe)
  2. don't call evaluate with empty input batches

The earlier fix had the effect of 2. as well, just in a less explicit way. The fix here does the same but adds the necessary checks in the explicit expr/expr code path. The code that's being seen as an optimisation is intended to prevent calling evaluate_selection with an all-false selection vector.

Copy link
Contributor Author

@pepijnve pepijnve Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I woke up early this morning to the realisation that the actual bug was a subtlety in the implementation of evaluate_selection. There's a difference between calling it with the empty set vs a non-empty set and a false selection vector. The implementation was actually treating both cases identically which can cause a spurious row to get materialised. I've pushed a correction for this and tweaked the comments in the code a bit.

I believe this properly addresses the original evaluation problem. All SQL logic tests pass even when commenting out the optimisation for true and false selection vectors in expr_or_expr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the change in evaluate_selection
Now that evaluate_selection is changed, do we need those lines here?

(They look nice but my only concern is additional code complexity which is harder to cover with SLT tests. Now that we have branching here, we should have a bunch of SLT cases that clearly exercise all-true, all-false, some-true situations.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, more SLTs! I will add some for the various cases you mentioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that evaluate_selection is changed, do we need those lines here?

@findepi Yes I would prefer to keep these early outs since they're pretty trivial and I think they're appropriate for the expr_or_expr function since the knowledge that it's "then or else" is located here. evaluate_selection cannot be implemented with awareness of this particular usage pattern.

Calling evaluate_selection still has to produce a value –it can't return None– so you pay at least a non-zero cost for calling it unnecessarily. If it's obvious that it will not perform any useful work, it makes sense to avoid it IMO.

Just for context, the queries I'm working on are quite case heavy. Any work we can save in the inner loop of the queries seems worthwhile.

// Treat 'NULL' as false value
let when_value = match when_value.null_count() {
0 => Cow::Borrowed(when_value),
Expand Down