Skip to content

Commit 2a3b131

Browse files
committed
pull schema from FileSource
1 parent 10ae665 commit 2a3b131

File tree

47 files changed

+439
-446
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+439
-446
lines changed

datafusion-examples/examples/advanced_parquet_index.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -499,12 +499,11 @@ impl TableProvider for IndexTableProvider {
499499
// provide the factory to create parquet reader without re-reading metadata
500500
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
501501
);
502-
let file_scan_config =
503-
FileScanConfigBuilder::new(object_store_url, schema, file_source)
504-
.with_limit(limit)
505-
.with_projection_indices(projection.cloned())
506-
.with_file(partitioned_file)
507-
.build();
502+
let file_scan_config = FileScanConfigBuilder::new(object_store_url, file_source)
503+
.with_limit(limit)
504+
.with_projection_indices(projection.cloned())
505+
.with_file(partitioned_file)
506+
.build();
508507

509508
// Finally, put it all together into a DataSourceExec
510509
Ok(DataSourceExec::from_data_source(file_scan_config))

datafusion-examples/examples/csv_json_opener.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ async fn csv_opener() -> Result<()> {
6565

6666
let scan_config = FileScanConfigBuilder::new(
6767
ObjectStoreUrl::local_filesystem(),
68-
Arc::clone(&schema),
6968
Arc::new(CsvSource::new(Arc::clone(&schema)).with_csv_options(options.clone())),
7069
)
7170
.with_projection_indices(Some(vec![12, 0]))
@@ -131,8 +130,7 @@ async fn json_opener() -> Result<()> {
131130

132131
let scan_config = FileScanConfigBuilder::new(
133132
ObjectStoreUrl::local_filesystem(),
134-
schema,
135-
Arc::new(JsonSource::default()),
133+
Arc::new(JsonSource::new(schema)),
136134
)
137135
.with_projection_indices(Some(vec![1, 0]))
138136
.with_limit(Some(5))

datafusion-examples/examples/default_column_values.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,6 @@ impl TableProvider for DefaultValueTableProvider {
257257

258258
let file_scan_config = FileScanConfigBuilder::new(
259259
ObjectStoreUrl::parse("memory://")?,
260-
self.schema.clone(),
261260
Arc::new(parquet_source),
262261
)
263262
.with_projection_indices(projection.cloned())

datafusion-examples/examples/parquet_embedded_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ impl TableProvider for DistinctIndexTable {
429429
let source = Arc::new(
430430
ParquetSource::new(self.schema.clone()).with_enable_page_index(true),
431431
);
432-
let mut builder = FileScanConfigBuilder::new(url, self.schema.clone(), source);
432+
let mut builder = FileScanConfigBuilder::new(url, source);
433433
for file in files_to_scan {
434434
let path = self.dir.join(file);
435435
let len = std::fs::metadata(&path)?.len();

datafusion-examples/examples/parquet_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ impl TableProvider for IndexTableProvider {
246246
let source =
247247
Arc::new(ParquetSource::new(self.schema()).with_predicate(predicate));
248248
let mut file_scan_config_builder =
249-
FileScanConfigBuilder::new(object_store_url, self.schema(), source)
249+
FileScanConfigBuilder::new(object_store_url, source)
250250
.with_projection_indices(projection.cloned())
251251
.with_limit(limit);
252252

datafusion/catalog-listing/src/table.rs

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -430,9 +430,6 @@ impl TableProvider for ListingTable {
430430
.map(|col| Ok(Arc::new(self.table_schema.field_with_name(&col.0)?.clone())))
431431
.collect::<datafusion_common::Result<Vec<_>>>()?;
432432

433-
let table_schema =
434-
TableSchema::new(Arc::clone(&self.file_schema), table_partition_cols.clone());
435-
436433
let table_partition_col_names = table_partition_cols
437434
.iter()
438435
.map(|field| field.name().as_str())
@@ -503,19 +500,15 @@ impl TableProvider for ListingTable {
503500
.format
504501
.create_physical_plan(
505502
state,
506-
FileScanConfigBuilder::new(
507-
object_store_url,
508-
table_schema.clone(),
509-
file_source,
510-
)
511-
.with_file_groups(partitioned_file_lists)
512-
.with_constraints(self.constraints.clone())
513-
.with_statistics(statistics)
514-
.with_projection_indices(projection)
515-
.with_limit(limit)
516-
.with_output_ordering(output_ordering)
517-
.with_expr_adapter(self.expr_adapter_factory.clone())
518-
.build(),
503+
FileScanConfigBuilder::new(object_store_url, file_source)
504+
.with_file_groups(partitioned_file_lists)
505+
.with_constraints(self.constraints.clone())
506+
.with_statistics(statistics)
507+
.with_projection_indices(projection)
508+
.with_limit(limit)
509+
.with_output_ordering(output_ordering)
510+
.with_expr_adapter(self.expr_adapter_factory.clone())
511+
.build(),
519512
)
520513
.await?;
521514

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ pub(crate) mod test_util {
8888
state,
8989
FileScanConfigBuilder::new(
9090
ObjectStoreUrl::local_filesystem(),
91-
file_schema.clone(),
9291
format.file_source(table_schema),
9392
)
9493
.with_file_groups(file_groups)

datafusion/core/src/datasource/mod.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,10 @@ mod tests {
127127
let source = ParquetSource::new(Arc::clone(&schema))
128128
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}))
129129
.unwrap();
130-
let base_conf = FileScanConfigBuilder::new(
131-
ObjectStoreUrl::local_filesystem(),
132-
schema,
133-
source,
134-
)
135-
.with_file(partitioned_file)
136-
.build();
130+
let base_conf =
131+
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
132+
.with_file(partitioned_file)
133+
.build();
137134

138135
let parquet_exec = DataSourceExec::from_data_source(base_conf);
139136

datafusion/core/src/datasource/physical_plan/avro.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,10 @@ mod tests {
8282
.await?;
8383

8484
let source = Arc::new(AvroSource::new(Arc::clone(&file_schema)));
85-
let conf = FileScanConfigBuilder::new(
86-
ObjectStoreUrl::local_filesystem(),
87-
file_schema,
88-
source,
89-
)
90-
.with_file(meta.into())
91-
.with_projection_indices(Some(vec![0, 1, 2]))
92-
.build();
85+
let conf = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
86+
.with_file(meta.into())
87+
.with_projection_indices(Some(vec![0, 1, 2]))
88+
.build();
9389

9490
let source_exec = DataSourceExec::from_data_source(conf);
9591
assert_eq!(
@@ -158,7 +154,7 @@ mod tests {
158154
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);
159155

160156
let source = Arc::new(AvroSource::new(Arc::clone(&file_schema)));
161-
let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
157+
let conf = FileScanConfigBuilder::new(object_store_url, source)
162158
.with_file(meta.into())
163159
.with_projection_indices(projection)
164160
.build();
@@ -227,12 +223,12 @@ mod tests {
227223
partitioned_file.partition_values = vec![ScalarValue::from("2021-10-26")];
228224

229225
let projection = Some(vec![0, 1, file_schema.fields().len(), 2]);
230-
let source = Arc::new(AvroSource::new(Arc::clone(&file_schema)));
231226
let table_schema = TableSchema::new(
232227
file_schema.clone(),
233228
vec![Arc::new(Field::new("date", DataType::Utf8, false))],
234229
);
235-
let conf = FileScanConfigBuilder::new(object_store_url, table_schema, source)
230+
let source = Arc::new(AvroSource::new(table_schema.clone()));
231+
let conf = FileScanConfigBuilder::new(object_store_url, source)
236232
// select specific columns of the files as well as the partitioning
237233
// column which is supposed to be the last column in the table schema.
238234
.with_projection_indices(projection)

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 39 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,12 @@ mod tests {
123123
let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema));
124124
let source =
125125
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
126-
let config = FileScanConfigBuilder::from(partitioned_csv_config(
127-
table_schema,
128-
file_groups,
129-
source,
130-
))
131-
.with_file_compression_type(file_compression_type)
132-
.with_newlines_in_values(false)
133-
.with_projection_indices(Some(vec![0, 2, 4]))
134-
.build();
126+
let config =
127+
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
128+
.with_file_compression_type(file_compression_type)
129+
.with_newlines_in_values(false)
130+
.with_projection_indices(Some(vec![0, 2, 4]))
131+
.build();
135132

136133
assert_eq!(13, config.file_schema().fields().len());
137134
let csv = DataSourceExec::from_data_source(config);
@@ -198,15 +195,12 @@ mod tests {
198195
let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema));
199196
let source =
200197
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
201-
let config = FileScanConfigBuilder::from(partitioned_csv_config(
202-
table_schema,
203-
file_groups,
204-
source,
205-
))
206-
.with_newlines_in_values(false)
207-
.with_file_compression_type(file_compression_type.to_owned())
208-
.with_projection_indices(Some(vec![4, 0, 2]))
209-
.build();
198+
let config =
199+
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
200+
.with_newlines_in_values(false)
201+
.with_file_compression_type(file_compression_type.to_owned())
202+
.with_projection_indices(Some(vec![4, 0, 2]))
203+
.build();
210204
assert_eq!(13, config.file_schema().fields().len());
211205
let csv = DataSourceExec::from_data_source(config);
212206
assert_eq!(3, csv.schema().fields().len());
@@ -272,15 +266,12 @@ mod tests {
272266
let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema));
273267
let source =
274268
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
275-
let config = FileScanConfigBuilder::from(partitioned_csv_config(
276-
table_schema,
277-
file_groups,
278-
source,
279-
))
280-
.with_newlines_in_values(false)
281-
.with_file_compression_type(file_compression_type.to_owned())
282-
.with_limit(Some(5))
283-
.build();
269+
let config =
270+
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
271+
.with_newlines_in_values(false)
272+
.with_file_compression_type(file_compression_type.to_owned())
273+
.with_limit(Some(5))
274+
.build();
284275
assert_eq!(13, config.file_schema().fields().len());
285276
let csv = DataSourceExec::from_data_source(config);
286277
assert_eq!(13, csv.schema().fields().len());
@@ -345,15 +336,12 @@ mod tests {
345336
let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema));
346337
let source =
347338
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
348-
let config = FileScanConfigBuilder::from(partitioned_csv_config(
349-
table_schema,
350-
file_groups,
351-
source,
352-
))
353-
.with_newlines_in_values(false)
354-
.with_file_compression_type(file_compression_type.to_owned())
355-
.with_limit(Some(5))
356-
.build();
339+
let config =
340+
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
341+
.with_newlines_in_values(false)
342+
.with_file_compression_type(file_compression_type.to_owned())
343+
.with_limit(Some(5))
344+
.build();
357345
assert_eq!(14, config.file_schema().fields().len());
358346
let csv = DataSourceExec::from_data_source(config);
359347
assert_eq!(14, csv.schema().fields().len());
@@ -410,23 +398,20 @@ mod tests {
410398
quote: b'"',
411399
..Default::default()
412400
};
413-
let source =
414-
Arc::new(CsvSource::new(Arc::clone(&file_schema)).with_csv_options(options));
415401
let table_schema = TableSchema::new(
416402
Arc::clone(&file_schema),
417403
vec![Arc::new(Field::new("date", DataType::Utf8, false))],
418404
);
419-
let config = FileScanConfigBuilder::from(partitioned_csv_config(
420-
table_schema,
421-
file_groups,
422-
source,
423-
))
424-
.with_newlines_in_values(false)
425-
.with_file_compression_type(file_compression_type.to_owned())
426-
// We should be able to project on the partition column
427-
// Which is supposed to be after the file fields
428-
.with_projection_indices(Some(vec![0, num_file_schema_fields]))
429-
.build();
405+
let source =
406+
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
407+
let config =
408+
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
409+
.with_newlines_in_values(false)
410+
.with_file_compression_type(file_compression_type.to_owned())
411+
// We should be able to project on the partition column
412+
// Which is supposed to be after the file fields
413+
.with_projection_indices(Some(vec![0, num_file_schema_fields]))
414+
.build();
430415

431416
// we don't have `/date=xx/` in the path but that is ok because
432417
// partitions are resolved during scan anyway
@@ -524,14 +509,11 @@ mod tests {
524509
let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema));
525510
let source =
526511
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
527-
let config = FileScanConfigBuilder::from(partitioned_csv_config(
528-
table_schema,
529-
file_groups,
530-
source,
531-
))
532-
.with_newlines_in_values(false)
533-
.with_file_compression_type(file_compression_type.to_owned())
534-
.build();
512+
let config =
513+
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
514+
.with_newlines_in_values(false)
515+
.with_file_compression_type(file_compression_type.to_owned())
516+
.build();
535517
let csv = DataSourceExec::from_data_source(config);
536518

537519
let it = csv.execute(0, task_ctx).unwrap();

0 commit comments

Comments
 (0)