-
Notifications
You must be signed in to change notification settings - Fork 195
perf: Respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config #1619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
@mbutrovich @parthchandra I am not very familiar with the code that I updated. Could you review? |
let mut table_parquet_options = TableParquetOptions::new(); | ||
table_parquet_options.global.pushdown_filters = pushdown_filters; | ||
// TODO: Maybe these are configs? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove the TODO, and maybe get rid of the reorder_filters = true
on the line below. DF still defaults that to false as well so we might not understand the performance implications of it yet. We could add a config for that in a follow up PR and measure the difference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I have updated this and also added some notes to the tuning guide.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other than the one optional suggestion, this LGTM.
Now that I think about it more: should this be a distinct config from Spark's (i.e., a Comet config like we might do with reorder_filters)? If we fallback to Spark scan are we hurting performance if this is set to false (as suggested by the tuning guide). Maybe we expect if someone is enabling the experimental reader, they should know that their workload is already capable of not falling back to Spark for scan. We can revisit these configs and defaults in the future, I suppose. For now I think this is reasonable for benchmarking the experimental readers. |
I was also thinking about this. If we add a Comet configuration, we can set the default to disable the filter pushdown for now. I will make this change. |
spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #1619 +/- ##
============================================
+ Coverage 56.12% 59.07% +2.94%
- Complexity 976 1072 +96
============================================
Files 119 125 +6
Lines 11743 12499 +756
Branches 2251 2340 +89
============================================
+ Hits 6591 7384 +793
+ Misses 4012 3950 -62
- Partials 1140 1165 +25 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
We do need to respect Spark's |
I don't see where |
Nvm. It is used to decide whether to pass in the predicates to the batch reader which uses it to filter out row groups. Note that the behavior of the native code must match this exactly especially for row index generation. So if we want to have a Comet config then it must the same value as the Spark config which makes it redundant. |
@@ -257,7 +257,8 @@ public static native long initRecordBatchReader( | |||
byte[] requiredSchema, | |||
byte[] dataSchema, | |||
String sessionTimezone, | |||
int batchSize); | |||
int batchSize, | |||
boolean pushdownFilters); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already implied if the filter parameter is null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I will update this today soon.
native/core/src/parquet/mod.rs
Outdated
@@ -703,6 +704,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat | |||
file_groups, | |||
None, | |||
data_filters, | |||
pushdown_filters != 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. data_filters
is an Option
and should be None
if filter pushdown is disabled.
I addressed the feedback but I no longer see a performance improvement with |
I was testing with the wrong Comet JAR file 🤦♂️ This is ready for re-review @mbutrovich @parthchandra |
Which issue does this PR close?
N/A
Rationale for this change
My primary motivation was to be able to run benchmarks with the new scans with and without Parquet pushdown enabled, since we know that there are performance issues.
What changes are included in this PR?
Respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config.
How are these changes tested?
I ran TPC-H @ 100 GB locally with
native_datafusion
scan.With pushdown enabled: 328 s
With pushdown disabled: 270 s