Skip to content

Commit b130145

Browse files
committed
Sort with limit before topk
1 parent f2437d1 commit b130145

File tree

2 files changed

+20
-6
lines changed

2 files changed

+20
-6
lines changed

datafusion/physical-plan/src/topk/mod.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
//! TopK: Combination of Sort / LIMIT
1919
2020
use arrow::{
21-
array::{Array, AsArray},
22-
compute::{interleave_record_batch, prep_null_mask_filter, FilterBuilder},
21+
array::{Array, AsArray, BooleanArray, UInt32Array},
22+
compute::{FilterBuilder, interleave_record_batch, prep_null_mask_filter, sort_to_indices},
2323
row::{RowConverter, Rows, SortField},
2424
};
2525
use datafusion_expr::{ColumnarValue, Operator};
@@ -172,6 +172,11 @@ fn build_sort_fields(
172172
.collect::<Result<_>>()
173173
}
174174

175+
enum TopKSelection {
176+
Boolean(BooleanArray),
177+
Indices(UInt32Array),
178+
}
179+
175180
impl TopK {
176181
/// Create a new [`TopK`] that stores the top `k` values, as
177182
/// defined by the sort expressions in `expr`.
@@ -251,7 +256,12 @@ impl TopK {
251256
// nothing to filter, so no need to update
252257
return Ok(());
253258
}
254-
// only update the keys / rows if the filter does not match all rows
259+
// If only a single sort key, and filters out 80% of the rows and existing filter is not very selective,
260+
// use sort_to_indices to get the top indices from the input batch
261+
if sort_keys.len() == 1 && (self.heap.k as f64) < 0.2 * (num_rows as f64) && (true_count > self.heap.k * 2) {
262+
let array = sort_keys[0].as_ref();
263+
selected_rows = Some(TopKSelection::Indices(sort_to_indices(array, None, Some(self.heap.k))?));
264+
}
255265
if true_count < num_rows {
256266
// Indices in `set_indices` should be correct if filter contains nulls
257267
// So we prepare the filter here. Note this is also done in the `FilterBuilder`
@@ -267,7 +277,7 @@ impl TopK {
267277
} else {
268278
filter_predicate.build()
269279
};
270-
selected_rows = Some(filter);
280+
selected_rows = Some(TopKSelection::Boolean(filter));
271281
sort_keys = sort_keys
272282
.iter()
273283
.map(|key| filter_predicate.filter(key).map_err(|x| x.into()))
@@ -281,9 +291,13 @@ impl TopK {
281291
let mut batch_entry = self.heap.register_batch(batch.clone());
282292

283293
let replacements = match selected_rows {
284-
Some(filter) => {
294+
Some(TopKSelection::Boolean(filter)) => {
285295
self.find_new_topk_items(filter.values().set_indices(), &mut batch_entry)
286296
}
297+
Some(TopKSelection::Indices(indices)) => {
298+
self.find_new_topk_items(indices.values().iter().map(|i| *i as usize), &mut batch_entry)
299+
}
300+
287301
None => self.find_new_topk_items(0..sort_keys[0].len(), &mut batch_entry),
288302
};
289303

0 commit comments

Comments
 (0)