Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Filter Parquet pages with ParquetColumnExpr #20714

Merged
merged 14 commits into from
Jan 27, 2025
38 changes: 38 additions & 0 deletions crates/polars-arrow/src/array/binview/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,44 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
pub fn values_iter(&self) -> MutableBinaryViewValueIter<T> {
MutableBinaryViewValueIter::new(self)
}

pub fn extend_from_array(&mut self, other: &BinaryViewArrayGeneric<T>) {
let slf_len = self.len();
match (&mut self.validity, other.validity()) {
(None, None) => {},
(Some(v), None) => v.extend_constant(other.len(), true),
(v @ None, Some(other)) => {
let mut bm = MutableBitmap::with_capacity(slf_len + other.len());
bm.extend_constant(slf_len, true);
bm.extend_from_bitmap(other);
*v = Some(bm);
},
(Some(slf), Some(other)) => slf.extend_from_bitmap(other),
}

if other.total_buffer_len() == 0 {
self.views.extend(other.views().iter().copied());
} else {
self.finish_in_progress();

let buffer_offset = self.completed_buffers().len() as u32;
self.completed_buffers
.extend(other.data_buffers().iter().cloned());

self.views.extend(other.views().iter().map(|view| {
let mut view = *view;
if view.length > View::MAX_INLINE_SIZE {
view.buffer_idx += buffer_offset;
}
view
}));

let new_total_buffer_len = self.total_buffer_len() + other.total_buffer_len();
self.total_buffer_len = new_total_buffer_len;
}

self.total_bytes_len = self.total_bytes_len() + other.total_bytes_len();
}
}

impl MutableBinaryViewArray<[u8]> {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/equal/null.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::array::{Array, NullArray};
use crate::array::NullArray;

#[inline]
pub(super) fn equal(lhs: &NullArray, rhs: &NullArray) -> bool {
Expand Down
17 changes: 17 additions & 0 deletions crates/polars-arrow/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,23 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static {

dyn_clone::clone_trait_object!(Array);

pub trait IntoBoxedArray {
fn into_boxed(self) -> Box<dyn Array>;
}

impl<A: Array> IntoBoxedArray for A {
#[inline(always)]
fn into_boxed(self) -> Box<dyn Array> {
Box::new(self) as _
}
}
impl IntoBoxedArray for Box<dyn Array> {
#[inline(always)]
fn into_boxed(self) -> Box<dyn Array> {
self
}
}

/// A trait describing a mutable array; i.e. an array whose values can be changed.
///
/// Mutable arrays cannot be cloned but can be mutated in place,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl NullArray {
}

#[inline]
fn len(&self) -> usize {
pub fn len(&self) -> usize {
self.length
}
}
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-core/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ impl Scalar {
self.value.is_nan()
}

#[inline(always)]
pub fn into_value(self) -> AnyValue<'static> {
self.value
}

#[inline(always)]
pub fn value(&self) -> &AnyValue<'static> {
&self.value
Expand Down
20 changes: 20 additions & 0 deletions crates/polars-expr/src/expressions/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,16 @@ impl PhysicalExpr for AggregationExpr {
self.input.collect_live_columns(lv);
}

fn isolate_column_expr(
&self,
_name: &str,
) -> Option<(
Arc<dyn PhysicalExpr>,
Option<SpecializedColumnPredicateExpr>,
)> {
None
}

fn is_scalar(&self) -> bool {
true
}
Expand Down Expand Up @@ -740,6 +750,16 @@ impl PhysicalExpr for AggQuantileExpr {
self.quantile.collect_live_columns(lv);
}

fn isolate_column_expr(
&self,
_name: &str,
) -> Option<(
Arc<dyn PhysicalExpr>,
Option<SpecializedColumnPredicateExpr>,
)> {
None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.input.to_field(input_schema)
}
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-expr/src/expressions/alias.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ impl PhysicalExpr for AliasExpr {
lv.insert(self.name.clone());
}

fn isolate_column_expr(
&self,
_name: &str,
) -> Option<(
Arc<dyn PhysicalExpr>,
Option<SpecializedColumnPredicateExpr>,
)> {
None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
Ok(Field::new(
self.name.clone(),
Expand Down
48 changes: 48 additions & 0 deletions crates/polars-expr/src/expressions/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,54 @@ impl PhysicalExpr for ApplyExpr {
}
}

fn isolate_column_expr(
&self,
_name: &str,
) -> Option<(
Arc<dyn PhysicalExpr>,
Option<SpecializedColumnPredicateExpr>,
)> {
None
}

fn replace_elementwise_const_columns(
&self,
const_columns: &PlHashMap<PlSmallStr, AnyValue<'static>>,
) -> Option<Arc<dyn PhysicalExpr>> {
if self.collect_groups == ApplyOptions::ElementWise {
let mut new_inputs = Vec::new();
for i in 0..self.inputs.len() {
match self.inputs[i].replace_elementwise_const_columns(const_columns) {
None => continue,
Some(new) => {
new_inputs.reserve(self.inputs.len());
new_inputs.extend(self.inputs[..i].iter().cloned());
new_inputs.push(new);
break;
},
}
}

// Only copy inputs if it is actually needed
if new_inputs.is_empty() {
return None;
}

new_inputs.extend(self.inputs[new_inputs.len()..].iter().map(|i| {
match i.replace_elementwise_const_columns(const_columns) {
None => i.clone(),
Some(new) => new,
}
}));

let mut slf = self.clone();
slf.inputs = new_inputs;
return Some(Arc::new(slf));
}

None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.expr.to_field(input_schema, Context::Default)
}
Expand Down
49 changes: 49 additions & 0 deletions crates/polars-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use polars_core::prelude::*;
use polars_core::POOL;
use polars_io::predicates::SpecializedColumnPredicateExpr;
#[cfg(feature = "round_series")]
use polars_ops::prelude::floor_div_series;

Expand Down Expand Up @@ -273,6 +274,54 @@ impl PhysicalExpr for BinaryExpr {
self.right.collect_live_columns(lv);
}

fn isolate_column_expr(
&self,
name: &str,
) -> Option<(
Arc<dyn PhysicalExpr>,
Option<SpecializedColumnPredicateExpr>,
)> {
let other = match (self.left.to_column(), self.right.to_column()) {
(Some(col), None) if col.as_str() == name => &self.right,
(None, Some(col)) if col.as_str() == name => &self.left,
_ => return None,
};

let value = other.evaluate_inline()?;
let value = value.as_scalar_column()?;

let scalar = value.scalar();
use Operator as O;
let specialized = match self.op {
O::Eq => Some(SpecializedColumnPredicateExpr::Eq(scalar.clone())),
O::EqValidity => Some(SpecializedColumnPredicateExpr::EqMissing(scalar.clone())),
_ => None,
};

Some((Arc::new(self.clone()) as _, specialized))
}

fn replace_elementwise_const_columns(
&self,
const_columns: &PlHashMap<PlSmallStr, AnyValue<'static>>,
) -> Option<Arc<dyn PhysicalExpr>> {
let rcc_left = self.left.replace_elementwise_const_columns(const_columns);
let rcc_right = self.right.replace_elementwise_const_columns(const_columns);

if rcc_left.is_some() || rcc_right.is_some() {
let mut slf = self.clone();
if let Some(left) = rcc_left {
slf.left = left;
}
if let Some(right) = rcc_right {
slf.right = right;
}
return Some(Arc::new(slf));
}

None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.expr.to_field(input_schema, Context::Default)
}
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-expr/src/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ impl PhysicalExpr for CastExpr {
self.input.collect_live_columns(lv);
}

fn isolate_column_expr(
&self,
_name: &str,
) -> Option<(
Arc<dyn PhysicalExpr>,
Option<SpecializedColumnPredicateExpr>,
)> {
None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.input.to_field(input_schema).map(|mut fld| {
fld.coerce(self.dtype.clone());
Expand Down
27 changes: 27 additions & 0 deletions crates/polars-expr/src/expressions/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,33 @@ impl PhysicalExpr for ColumnExpr {
lv.insert(self.name.clone());
}

fn isolate_column_expr(
&self,
_name: &str,
) -> Option<(
Arc<dyn PhysicalExpr>,
Option<SpecializedColumnPredicateExpr>,
)> {
None
}

fn to_column(&self) -> Option<&PlSmallStr> {
Some(&self.name)
}

fn replace_elementwise_const_columns(
&self,
const_columns: &PlHashMap<PlSmallStr, AnyValue<'static>>,
) -> Option<Arc<dyn PhysicalExpr>> {
if let Some(av) = const_columns.get(&self.name) {
let lv = LiteralValue::from(av.clone());
let le = LiteralExpr::new(lv, self.expr.clone());
return Some(Arc::new(le));
}

None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
input_schema.get_field(&self.name).ok_or_else(|| {
polars_err!(
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-expr/src/expressions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ impl PhysicalExpr for CountExpr {

fn collect_live_columns(&self, _lv: &mut PlIndexSet<PlSmallStr>) {}

fn isolate_column_expr(
&self,
_name: &str,
) -> Option<(
Arc<dyn PhysicalExpr>,
Option<SpecializedColumnPredicateExpr>,
)> {
None
}

fn to_field(&self, _input_schema: &Schema) -> PolarsResult<Field> {
Ok(Field::new(PlSmallStr::from_static(LEN), IDX_DTYPE))
}
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-expr/src/expressions/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ impl PhysicalExpr for FilterExpr {
self.by.collect_live_columns(lv);
}

fn isolate_column_expr(
&self,
_name: &str,
) -> Option<(
Arc<dyn PhysicalExpr>,
Option<SpecializedColumnPredicateExpr>,
)> {
None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.input.to_field(input_schema)
}
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-expr/src/expressions/gather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ impl PhysicalExpr for GatherExpr {
self.idx.collect_live_columns(lv);
}

fn isolate_column_expr(
&self,
_name: &str,
) -> Option<(
Arc<dyn PhysicalExpr>,
Option<SpecializedColumnPredicateExpr>,
)> {
None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.phys_expr.to_field(input_schema)
}
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-expr/src/expressions/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ impl PhysicalExpr for LiteralExpr {

fn collect_live_columns(&self, _lv: &mut PlIndexSet<PlSmallStr>) {}

fn isolate_column_expr(
&self,
_name: &str,
) -> Option<(
Arc<dyn PhysicalExpr>,
Option<SpecializedColumnPredicateExpr>,
)> {
None
}

fn to_field(&self, _input_schema: &Schema) -> PolarsResult<Field> {
let dtype = self.0.get_datatype();
Ok(Field::new(PlSmallStr::from_static("literal"), dtype))
Expand Down
Loading
Loading