Skip to content

Commit 6eb8d45

Browse files
Let FileScanConfig own a list of ProjectionExprs (#18253)
## Which issue does this PR close? - Related to #14993 ## Rationale for this change To enable expression pushdown to file sources, we need to plumb expressions through the `FileScanConfig` layer. Currently, `FileScanConfig` only tracks column indices for projection, which limits us to simple and naive column selection. This PR begins expression pushdown implementation by having `FileScanConfig` own a list of `ProjectionExpr`s, instead of column indices. This allows file sources to eventually receive and optimize based on the actual expressions being projected. ## Notes about this PR - The first commit is based off of #18231 - To avoid a super large diff and a harder review, I've decided to break (#14993) into 2 tasks: - Have the `DataSource` (`FileScanConfig`) actually hold projection expressions (this PR) - Flow the projection expressions from `DataSourceExec` all the way to the `FileSource` --------- Co-authored-by: Adrian Garcia Badaracco <[email protected]>
1 parent 440fb82 commit 6eb8d45

File tree

23 files changed

+334
-116
lines changed

23 files changed

+334
-116
lines changed

datafusion-examples/examples/advanced_parquet_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ impl TableProvider for IndexTableProvider {
502502
let file_scan_config =
503503
FileScanConfigBuilder::new(object_store_url, schema, file_source)
504504
.with_limit(limit)
505-
.with_projection(projection.cloned())
505+
.with_projection_indices(projection.cloned())
506506
.with_file(partitioned_file)
507507
.build();
508508

datafusion-examples/examples/csv_json_opener.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ async fn csv_opener() -> Result<()> {
6060
Arc::clone(&schema),
6161
Arc::new(CsvSource::default()),
6262
)
63-
.with_projection(Some(vec![12, 0]))
63+
.with_projection_indices(Some(vec![12, 0]))
6464
.with_limit(Some(5))
6565
.with_file(PartitionedFile::new(path.display().to_string(), 10))
6666
.build();
@@ -126,7 +126,7 @@ async fn json_opener() -> Result<()> {
126126
schema,
127127
Arc::new(JsonSource::default()),
128128
)
129-
.with_projection(Some(vec![1, 0]))
129+
.with_projection_indices(Some(vec![1, 0]))
130130
.with_limit(Some(5))
131131
.with_file(PartitionedFile::new(path.to_string(), 10))
132132
.build();

datafusion-examples/examples/default_column_values.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ impl TableProvider for DefaultValueTableProvider {
260260
self.schema.clone(),
261261
Arc::new(parquet_source),
262262
)
263-
.with_projection(projection.cloned())
263+
.with_projection_indices(projection.cloned())
264264
.with_limit(limit)
265265
.with_file_group(file_group)
266266
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _));

datafusion-examples/examples/parquet_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ impl TableProvider for IndexTableProvider {
246246
let source = Arc::new(ParquetSource::default().with_predicate(predicate));
247247
let mut file_scan_config_builder =
248248
FileScanConfigBuilder::new(object_store_url, self.schema(), source)
249-
.with_projection(projection.cloned())
249+
.with_projection_indices(projection.cloned())
250250
.with_limit(limit);
251251

252252
// Transform to the format needed to pass to DataSourceExec

datafusion/catalog-listing/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ impl TableProvider for ListingTable {
499499
.with_file_groups(partitioned_file_lists)
500500
.with_constraints(self.constraints.clone())
501501
.with_statistics(statistics)
502-
.with_projection(projection)
502+
.with_projection_indices(projection)
503503
.with_limit(limit)
504504
.with_output_ordering(output_ordering)
505505
.with_table_partition_cols(table_partition_cols)

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ pub(crate) mod test_util {
9090
)
9191
.with_file_groups(file_groups)
9292
.with_statistics(statistics)
93-
.with_projection(projection)
93+
.with_projection_indices(projection)
9494
.with_limit(limit)
9595
.build(),
9696
)

datafusion/core/src/datasource/physical_plan/avro.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ mod tests {
8888
source,
8989
)
9090
.with_file(meta.into())
91-
.with_projection(Some(vec![0, 1, 2]))
91+
.with_projection_indices(Some(vec![0, 1, 2]))
9292
.build();
9393

9494
let source_exec = DataSourceExec::from_data_source(conf);
@@ -160,7 +160,7 @@ mod tests {
160160
let source = Arc::new(AvroSource::new());
161161
let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
162162
.with_file(meta.into())
163-
.with_projection(projection)
163+
.with_projection_indices(projection)
164164
.build();
165165

166166
let source_exec = DataSourceExec::from_data_source(conf);
@@ -231,7 +231,7 @@ mod tests {
231231
let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
232232
// select specific columns of the files as well as the partitioning
233233
// column which is supposed to be the last column in the table schema.
234-
.with_projection(projection)
234+
.with_projection_indices(projection)
235235
.with_file(partitioned_file)
236236
.with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)])
237237
.build();

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ mod tests {
118118
))
119119
.with_file_compression_type(file_compression_type)
120120
.with_newlines_in_values(false)
121-
.with_projection(Some(vec![0, 2, 4]))
121+
.with_projection_indices(Some(vec![0, 2, 4]))
122122
.build();
123123

124124
assert_eq!(13, config.file_schema().fields().len());
@@ -183,7 +183,7 @@ mod tests {
183183
))
184184
.with_newlines_in_values(false)
185185
.with_file_compression_type(file_compression_type.to_owned())
186-
.with_projection(Some(vec![4, 0, 2]))
186+
.with_projection_indices(Some(vec![4, 0, 2]))
187187
.build();
188188
assert_eq!(13, config.file_schema().fields().len());
189189
let csv = DataSourceExec::from_data_source(config);
@@ -373,7 +373,7 @@ mod tests {
373373
.with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)])
374374
// We should be able to project on the partition column
375375
// Which is supposed to be after the file fields
376-
.with_projection(Some(vec![0, num_file_schema_fields]))
376+
.with_projection_indices(Some(vec![0, num_file_schema_fields]))
377377
.build();
378378

379379
// we don't have `/date=xx/` in the path but that is ok because

datafusion/core/src/datasource/physical_plan/json.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ mod tests {
297297
let source = Arc::new(JsonSource::new());
298298
let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
299299
.with_file_groups(file_groups)
300-
.with_projection(Some(vec![0, 2]))
300+
.with_projection_indices(Some(vec![0, 2]))
301301
.with_file_compression_type(file_compression_type.to_owned())
302302
.build();
303303
let exec = DataSourceExec::from_data_source(conf);
@@ -345,7 +345,7 @@ mod tests {
345345
let source = Arc::new(JsonSource::new());
346346
let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
347347
.with_file_groups(file_groups)
348-
.with_projection(Some(vec![3, 0, 2]))
348+
.with_projection_indices(Some(vec![3, 0, 2]))
349349
.with_file_compression_type(file_compression_type.to_owned())
350350
.build();
351351
let exec = DataSourceExec::from_data_source(conf);

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ mod tests {
201201
source,
202202
)
203203
.with_file_group(file_group)
204-
.with_projection(self.projection.clone())
204+
.with_projection_indices(self.projection.clone())
205205
.build();
206206
DataSourceExec::from_data_source(base_config)
207207
}
@@ -1655,7 +1655,7 @@ mod tests {
16551655
let config = FileScanConfigBuilder::new(object_store_url, schema.clone(), source)
16561656
.with_file(partitioned_file)
16571657
// file has 10 cols so index 12 should be month and 13 should be day
1658-
.with_projection(Some(vec![0, 1, 2, 12, 13]))
1658+
.with_projection_indices(Some(vec![0, 1, 2, 12, 13]))
16591659
.with_table_partition_cols(vec![
16601660
Field::new("year", DataType::Utf8, false),
16611661
Field::new("month", DataType::UInt8, false),

0 commit comments

Comments
 (0)